Class: Tap::Support::Joins::SyncMerge

Inherits:
ReverseJoin show all
Defined in:
lib/tap/support/joins/sync_merge.rb

Overview

SyncMerge passes the collected results of the sources to the target. The results will not be passed until results from all of the sources are available; results are passed in one group. Similarly, a collision results if a single source completes twice before the group.

Constant Summary

Constants inherited from Tap::Support::Join

Tap::Support::Join::FLAGS, Tap::Support::Join::SHORT_FLAGS

Instance Method Summary collapse

Methods inherited from ReverseJoin

#inspect, join

Methods inherited from Tap::Support::Join

#initialize, #inspect, join, #name, #options

Constructor Details

This class inherits a constructor from Tap::Support::Join

Instance Method Details

#join(target, sources) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/tap/support/joins/sync_merge.rb', line 12

def join(target, sources)

  # a hash of (source, index) pairs where index is the
  # index of the source in a combination
  indicies = {}

  # a hash of (source, combinations) pairs where combinations
  # are combination arrays that the source participates in.
  # note that in unbatched mode, some sources may not
  # participate in any combinations.
  combinations = {}

  sets = sources.collect {|source| unbatched ? [source] : source.batch }
  Support::Combinator.new(*sets).each do |combo|
    combination = Array.new(combo.length, nil)

    combo.each do |source|
      indicies[source] ||= combo.index(source)
      (combinations[source] ||= []) << combination
    end
  end

  sources.each_with_index do |source, index|
    complete(source) do |_result|
      src = _result.key

      source_index = indicies[src]
      (combinations[src] ||= []).each do |combination|
        if combination[source_index] != nil
          raise "sync_merge collision... already got a result for #{src}"
        end

        combination[source_index] = _result
        unless combination.include?(nil)

          yield(*combination) if block_given?
          enq(target, *combination)

          # reset the group array
          combination.collect! {|i| nil }
        end
      end
    end
  end
end