Class: Tap::Joins::Sync

Inherits:
Tap::Join show all
Defined in:
lib/tap/joins/sync.rb

Overview

:startdoc::join a synchronized multi-way join

Sync works the same as Join, but passes the collected results of the inputs (ie an array) to the outputs. The results will not be passed until all of inputs have returned. A collision results if a single input completes twice before the group completes as a whole.

Defined Under Namespace

Classes: Callback, SynchronizeError

Constant Summary collapse

NIL_VALUE =

NIL_VALUE is used to mark empty slots (nil itself cannot be used because it is a valid result value).

Object.new

Instance Attribute Summary collapse

Attributes inherited from Tap::Join

#app, #inputs, #outputs

Instance Method Summary collapse

Methods inherited from Tap::Join

inherited, instantiate, intern, parse, parse!

Constructor Details

#initialize(config = {}, app = Tap::App.instance) ⇒ Sync

Returns a new instance of Sync.



20
21
22
23
# File 'lib/tap/joins/sync.rb', line 20

def initialize(config={}, app=Tap::App.instance)
  super
  @results = nil
end

Instance Attribute Details

#resultsObject (readonly)

An array holding results until the batch is ready to execute.



18
19
20
# File 'lib/tap/joins/sync.rb', line 18

def results
  @results
end

Instance Method Details

#call(result, index) ⇒ Object

Call is called by a Callback and stores the result at the specified index in results. If the results have all been set, then they are sent to each output.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/tap/joins/sync.rb', line 55

def call(result, index)
  if result == NIL_VALUE
    raise "NIL_VALUE cannot be passed as a result"
  end
  
  unless results[index] == NIL_VALUE
    raise SynchronizeError, "already got a result for: #{inputs[index]}"
  end
  results[index] = result
  
  unless results.include?(NIL_VALUE)
    outputs.each {|output| dispatch(output, results) }
    reset
  end
end

#join(inputs, outputs) ⇒ Object

A synchronized join sets a Callback as the join of each input. The callback is responsible for setting the result of each input into the correct ‘results’ slot.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/tap/joins/sync.rb', line 34

def join(inputs, outputs)
  @inputs.each do |input|
    input.joins.delete(self)
  end if @inputs

  @inputs = inputs

  index = 0
  inputs.each do |input|
    input.joins << Callback.new(self, index)
    index += 1
  end if inputs
  reset
  
  @outputs = outputs
  self
end

#resetObject

Resets results. Normally there is no reason to call this method as it will shuffle the arguments being passed through self.



27
28
29
# File 'lib/tap/joins/sync.rb', line 27

def reset
  @results = inputs ? Array.new(inputs.length, NIL_VALUE) : nil
end