Module: Tupelo::Client::Api
- Included in:
- Tupelo::Client
- Defined in:
- lib/tupelo/client/reader.rb,
lib/tupelo/client/subspace.rb,
lib/tupelo/client/transaction.rb
Overview
Include into class that defines #worker and #log.
Instance Method Summary collapse
- #abort ⇒ Object
- #define_subspace(tag, template, addr: nil) ⇒ Object
- #notifier ⇒ Object
- #pulse_nowait(*tuples) ⇒ Object (also: #pulse)
- #pulse_wait(*tuples) ⇒ Object
-
#read_all(template = Object) ⇒ Object
Returns all matching tuples currently in the space.
-
#read_nowait(template = Object) ⇒ Object
The template defaults to Object, which matches any tuple.
-
#read_wait(template = Object) ⇒ Object
(also: #read)
If no block given, return one matching tuple, blocking if necessary.
- #subspace(tag) ⇒ Object
- #take(template, timeout: nil) ⇒ Object
- #take_nowait(template) ⇒ Object
- #trans_class ⇒ Object
-
#transaction(timeout: nil, &block) ⇒ Object
Transactions are atomic and isolated.
-
#use_subspaces! ⇒ Object
call this just once at start of first client (it’s optional to preserve behavior of non-subspace-aware code).
-
#write_nowait(*tuples) ⇒ Object
(also: #write)
returns an object whose #wait method waits for write to be ack-ed.
-
#write_wait(*tuples) ⇒ Object
waits for write to be ack-ed.
Instance Method Details
#abort ⇒ Object
43 44 45 |
# File 'lib/tupelo/client/transaction.rb', line 43 def abort raise TransactionAbort end |
#define_subspace(tag, template, addr: nil) ⇒ Object
3 4 5 6 7 8 9 10 11 |
# File 'lib/tupelo/client/subspace.rb', line 3 def define_subspace tag, template, addr: nil = { TUPELO_META_KEY => "subspace", tag: tag, template: PortableObjectTemplate.spec_from(template), addr: addr } write_wait end |
#notifier ⇒ Object
51 52 53 |
# File 'lib/tupelo/client/reader.rb', line 51 def notifier NotifyWaiter.new(self).tap {|n| n.toggle} end |
#pulse_nowait(*tuples) ⇒ Object Also known as: pulse
60 61 62 63 64 |
# File 'lib/tupelo/client/transaction.rb', line 60 def pulse_nowait *tuples t = transaction t.pulse *tuples t.commit end |
#pulse_wait(*tuples) ⇒ Object
67 68 69 |
# File 'lib/tupelo/client/transaction.rb', line 67 def pulse_wait *tuples pulse_nowait(*tuples).wait end |
#read_all(template = Object) ⇒ Object
Returns all matching tuples currently in the space. The template defaults to Object, which matches any tuple. Does not wait for more tuples to arrive.
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/tupelo/client/reader.rb', line 40 def read_all template = Object matcher = Matcher.new(worker.make_template(template), self, :all => true) worker << matcher a = [] while tuple = matcher.wait ## inefficient? yield tuple if block_given? a << tuple end a end |
#read_nowait(template = Object) ⇒ Object
The template defaults to Object, which matches any tuple.
31 32 33 34 35 |
# File 'lib/tupelo/client/reader.rb', line 31 def read_nowait template = Object matcher = Matcher.new(worker.make_template(template), self) worker << matcher matcher.wait end |
#read_wait(template = Object) ⇒ Object Also known as: read
If no block given, return one matching tuple, blocking if necessary. If block given, yield each matching tuple that is found locally and then yield each new match as it is written to the space. Guaranteed not to miss tuples, even if they arrive and are immediately taken. (Note that simply doing read(template) in a loop would not have this guarantee.) The template defaults to Object, which matches any tuple.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/tupelo/client/reader.rb', line 13 def read_wait template = Object waiter = Waiter.new(worker.make_template(template), self, !block_given?) worker << waiter if block_given? loop do yield waiter.wait end else result = waiter.wait waiter = nil result end ensure worker << Unwaiter.new(waiter) if waiter end |
#subspace(tag) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/tupelo/client/subspace.rb', line 25 def subspace tag tag = tag.to_s worker.subspaces.find {|sp| sp.tag == tag} or begin if .include? tag read TUPELO_META_KEY => "subspace", tag: tag, template: nil, addr: nil worker.subspaces.find {|sp| sp.tag == tag} end end ## this impl will not be safe with dynamic subspaces end |
#take(template, timeout: nil) ⇒ Object
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/tupelo/client/transaction.rb', line 71 def take template, timeout: nil transaction timeout: timeout do |t| tuple = t.take template if block_given? yield tuple else tuple end end end |
#take_nowait(template) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/tupelo/client/transaction.rb', line 82 def take_nowait template transaction do |t| tuple = t.take_nowait template return nil if tuple.nil? yield tuple if block_given? tuple end end |
#trans_class ⇒ Object
12 13 14 |
# File 'lib/tupelo/client/transaction.rb', line 12 def trans_class Transaction end |
#transaction(timeout: nil, &block) ⇒ Object
Transactions are atomic and isolated. Without a block, returns the Transaction. In the block form, transaction automatically waits for successful completion and returns the value of the block.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/tupelo/client/transaction.rb', line 19 def transaction timeout: nil, &block deadline = timeout && Time.now + timeout t = trans_class.new self, deadline: deadline return t unless block_given? val = if block.arity == 0 t.instance_eval &block else yield t end t.commit.wait return val rescue TransactionFailure => ex log.info {"retrying #{t.inspect}: #{ex}"} retry rescue TransactionAbort log.info {"aborting #{t.inspect}"} :abort ensure t.cancel if t and t.open? and block_given? end |
#use_subspaces! ⇒ Object
call this just once at start of first client (it’s optional to preserve behavior of non-subspace-aware code)
15 16 17 18 19 20 21 22 23 |
# File 'lib/tupelo/client/subspace.rb', line 15 def use_subspaces! return if subspace(TUPELO_SUBSPACE_TAG) define_subspace(TUPELO_SUBSPACE_TAG, { TUPELO_META_KEY => "subspace", tag: nil, template: nil, addr: nil }) end |
#write_nowait(*tuples) ⇒ Object Also known as: write
returns an object whose #wait method waits for write to be ack-ed
48 49 50 51 52 |
# File 'lib/tupelo/client/transaction.rb', line 48 def write_nowait *tuples t = transaction t.write *tuples t.commit end |
#write_wait(*tuples) ⇒ Object
waits for write to be ack-ed
56 57 58 |
# File 'lib/tupelo/client/transaction.rb', line 56 def write_wait *tuples write_nowait(*tuples).wait end |