Module: Tupelo::Client::Api

Included in:
Tupelo::Client
Defined in:
lib/tupelo/client/reader.rb,
lib/tupelo/client/subspace.rb,
lib/tupelo/client/transaction.rb

Overview

Include into class that defines #worker and #log.

Instance Method Summary collapse

Instance Method Details

#abortObject

Raises:



43
44
45
# File 'lib/tupelo/client/transaction.rb', line 43

def abort
  raise TransactionAbort
end

#define_subspace(tag, template, addr: nil) ⇒ Object



3
4
5
6
7
8
9
10
11
# File 'lib/tupelo/client/subspace.rb', line 3

def define_subspace tag, template, addr: nil
  metatuple = {
    TUPELO_META_KEY => "subspace",
    tag:        tag,
    template:   PortableObjectTemplate.spec_from(template),
    addr:       addr
  }
  write_wait metatuple
end

#notifierObject



51
52
53
# File 'lib/tupelo/client/reader.rb', line 51

def notifier
  NotifyWaiter.new(self).tap {|n| n.toggle}
end

#pulse_nowait(*tuples) ⇒ Object Also known as: pulse



60
61
62
63
64
# File 'lib/tupelo/client/transaction.rb', line 60

def pulse_nowait *tuples
  t = transaction
  t.pulse *tuples
  t.commit
end

#pulse_wait(*tuples) ⇒ Object



67
68
69
# File 'lib/tupelo/client/transaction.rb', line 67

def pulse_wait *tuples
  pulse_nowait(*tuples).wait
end

#read_all(template = Object) ⇒ Object

Returns all matching tuples currently in the space. The template defaults to Object, which matches any tuple. Does not wait for more tuples to arrive.



40
41
42
43
44
45
46
47
48
49
# File 'lib/tupelo/client/reader.rb', line 40

def read_all template = Object
  matcher = Matcher.new(worker.make_template(template), self, :all => true)
  worker << matcher
  a = []
  while tuple = matcher.wait ## inefficient?
    yield tuple if block_given?
    a << tuple
  end
  a
end

#read_nowait(template = Object) ⇒ Object

The template defaults to Object, which matches any tuple.



31
32
33
34
35
# File 'lib/tupelo/client/reader.rb', line 31

def read_nowait template = Object
  matcher = Matcher.new(worker.make_template(template), self)
  worker << matcher
  matcher.wait
end

#read_wait(template = Object) ⇒ Object Also known as: read

If no block given, return one matching tuple, blocking if necessary. If block given, yield each matching tuple that is found locally and then yield each new match as it is written to the space. Guaranteed not to miss tuples, even if they arrive and are immediately taken. (Note that simply doing read(template) in a loop would not have this guarantee.) The template defaults to Object, which matches any tuple.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/tupelo/client/reader.rb', line 13

def read_wait template = Object
  waiter = Waiter.new(worker.make_template(template), self, !block_given?)
  worker << waiter
  if block_given?
    loop do
      yield waiter.wait
    end
  else
    result = waiter.wait
    waiter = nil
    result
  end
ensure
  worker << Unwaiter.new(waiter) if waiter
end

#subspace(tag) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/tupelo/client/subspace.rb', line 25

def subspace tag
  tag = tag.to_s
  worker.subspaces.find {|sp| sp.tag == tag} or begin
    if subscribed_tags.include? tag
      read TUPELO_META_KEY => "subspace",
        tag:      tag,
        template: nil,
        addr:     nil
      worker.subspaces.find {|sp| sp.tag == tag}
    end
  end
  ## this impl will not be safe with dynamic subspaces
end

#take(template, timeout: nil) ⇒ Object



71
72
73
74
75
76
77
78
79
80
# File 'lib/tupelo/client/transaction.rb', line 71

def take template, timeout: nil
  transaction timeout: timeout do |t|
    tuple = t.take template
    if block_given?
      yield tuple
    else
      tuple
    end
  end
end

#take_nowait(template) ⇒ Object



82
83
84
85
86
87
88
89
# File 'lib/tupelo/client/transaction.rb', line 82

def take_nowait template
  transaction do |t|
    tuple = t.take_nowait template
    return nil if tuple.nil?
    yield tuple if block_given?
    tuple
  end
end

#trans_classObject



12
13
14
# File 'lib/tupelo/client/transaction.rb', line 12

def trans_class
  Transaction
end

#transaction(timeout: nil, &block) ⇒ Object

Transactions are atomic and isolated. Without a block, returns the Transaction. In the block form, transaction automatically waits for successful completion and returns the value of the block.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/tupelo/client/transaction.rb', line 19

def transaction timeout: nil, &block
  deadline = timeout && Time.now + timeout
  t = trans_class.new self, deadline: deadline
  return t unless block_given?

  val =
    if block.arity == 0
      t.instance_eval &block
    else
      yield t
    end

  t.commit.wait
  return val
rescue TransactionFailure => ex
  log.info {"retrying #{t.inspect}: #{ex}"}
  retry
rescue TransactionAbort
  log.info {"aborting #{t.inspect}"}
  :abort
ensure
  t.cancel if t and t.open? and block_given?
end

#use_subspaces!Object

call this just once at start of first client (it’s optional to preserve behavior of non-subspace-aware code)



15
16
17
18
19
20
21
22
23
# File 'lib/tupelo/client/subspace.rb', line 15

def use_subspaces!
  return if subspace(TUPELO_SUBSPACE_TAG)
  define_subspace(TUPELO_SUBSPACE_TAG, {
    TUPELO_META_KEY => "subspace",
    tag:        nil,
    template:   nil,
    addr:       nil
  })
end

#write_nowait(*tuples) ⇒ Object Also known as: write

returns an object whose #wait method waits for write to be ack-ed



48
49
50
51
52
# File 'lib/tupelo/client/transaction.rb', line 48

def write_nowait *tuples
  t = transaction
  t.write *tuples
  t.commit
end

#write_wait(*tuples) ⇒ Object

waits for write to be ack-ed



56
57
58
# File 'lib/tupelo/client/transaction.rb', line 56

def write_wait *tuples
  write_nowait(*tuples).wait
end