Class: Tap::Joins::Sync
Overview
:startdoc::join a synchronized multi-way join
Sync works the same as Join, but passes the collected results of the inputs (ie an array) to the outputs. The results will not be passed until all of inputs have returned. A collision results if a single input completes twice before the group completes as a whole.
Defined Under Namespace
Classes: Callback, SynchronizeError
Constant Summary collapse
- NIL_VALUE =
NIL_VALUE is used to mark empty slots (nil itself cannot be used because it is a valid result value).
Object.new
Instance Attribute Summary collapse
-
#results ⇒ Object
readonly
An array holding results until the batch is ready to execute.
Attributes inherited from Tap::Join
Instance Method Summary collapse
-
#call(result, index) ⇒ Object
Call is called by a Callback and stores the result at the specified index in results.
-
#initialize(config = {}, app = Tap::App.instance) ⇒ Sync
constructor
A new instance of Sync.
-
#join(inputs, outputs) ⇒ Object
A synchronized join sets a Callback as the join of each input.
-
#reset ⇒ Object
Resets results.
Methods inherited from Tap::Join
inherited, instantiate, intern, parse, parse!
Constructor Details
#initialize(config = {}, app = Tap::App.instance) ⇒ Sync
Returns a new instance of Sync.
20 21 22 23 |
# File 'lib/tap/joins/sync.rb', line 20 def initialize(config={}, app=Tap::App.instance) super @results = nil end |
Instance Attribute Details
#results ⇒ Object (readonly)
An array holding results until the batch is ready to execute.
18 19 20 |
# File 'lib/tap/joins/sync.rb', line 18 def results @results end |
Instance Method Details
#call(result, index) ⇒ Object
Call is called by a Callback and stores the result at the specified index in results. If the results have all been set, then they are sent to each output.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/tap/joins/sync.rb', line 55 def call(result, index) if result == NIL_VALUE raise "NIL_VALUE cannot be passed as a result" end unless results[index] == NIL_VALUE raise SynchronizeError, "already got a result for: #{inputs[index]}" end results[index] = result unless results.include?(NIL_VALUE) outputs.each {|output| dispatch(output, results) } reset end end |
#join(inputs, outputs) ⇒ Object
A synchronized join sets a Callback as the join of each input. The callback is responsible for setting the result of each input into the correct ‘results’ slot.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/tap/joins/sync.rb', line 34 def join(inputs, outputs) @inputs.each do |input| input.joins.delete(self) end if @inputs @inputs = inputs index = 0 inputs.each do |input| input.joins << Callback.new(self, index) index += 1 end if inputs reset @outputs = outputs self end |
#reset ⇒ Object
Resets results. Normally there is no reason to call this method as it will shuffle the arguments being passed through self.
27 28 29 |
# File 'lib/tap/joins/sync.rb', line 27 def reset @results = inputs ? Array.new(inputs.length, NIL_VALUE) : nil end |