Class: OFlow::Actors::Merger
- Inherits:
-
OFlow::Actor
- Object
- OFlow::Actor
- OFlow::Actors::Merger
- Defined in:
- lib/oflow/actors/merger.rb
Instance Attribute Summary
Attributes inherited from OFlow::Actor
Instance Method Summary collapse
- #box_key(box) ⇒ Object
-
#initialize(task, options) ⇒ Merger
constructor
A new instance of Merger.
-
#match(target) ⇒ Object
Should look at all the waiting boxes and determine which of those if any are a match for the target.
-
#merge(boxes) ⇒ Object
Should merge the boxes and return a single box.
- #perform(op, box) ⇒ Object
- #waiting_add(box) ⇒ Object
- #waiting_remove(box) ⇒ Object
Methods inherited from OFlow::Actor
#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread
Constructor Details
#initialize(task, options) ⇒ Merger
Returns a new instance of Merger.
7 8 9 10 11 12 13 |
# File 'lib/oflow/actors/merger.rb', line 7 def initialize(task, ) super @match_key = .fetch(:match, :tracker) @cnt = .fetch(:count, 2) # Hash of Arrays @waiting = {} end |
Instance Method Details
#box_key(box) ⇒ Object
27 28 29 30 31 32 33 34 35 |
# File 'lib/oflow/actors/merger.rb', line 27 def box_key(box) key = nil if :tracker == @match_key key = box.tracker.id unless box.tracker.nil? elsif !@match_key.nil? key = box.get(@match_key) end key end |
#match(target) ⇒ Object
Should look at all the waiting boxes and determine which of those if any are a match for the target. If all necessary matches are found then an array of the boxes are returned, otherwise nil is returned.
57 58 59 60 61 62 |
# File 'lib/oflow/actors/merger.rb', line 57 def match(target) key = box_key(target) boxes = @waiting[key] return nil if boxes.nil? || (boxes.size + 1) < @cnt Array.new(boxes) end |
#merge(boxes) ⇒ Object
Should merge the boxes and return a single box. The default is to take all the box contents and place them in an Array and then merge the Trackers if there are any.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/oflow/actors/merger.rb', line 67 def merge(boxes) content = [] tracker = nil boxes.each do |b| content << b.contents unless b.tracker.nil? if tracker.nil? tracker = b.tracker else tracker = tracker.merge(b.tracker) end end end Box.new(content, tracker) end |
#perform(op, box) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/oflow/actors/merger.rb', line 15 def perform(op, box) matches = match(box) if matches.nil? waiting_add(box) else matches.each { |b| waiting_remove(b) } matches << box box = merge(matches) task.ship(nil, box) end end |
#waiting_add(box) ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/oflow/actors/merger.rb', line 37 def waiting_add(box) key = box_key(box) boxes = @waiting[key] if boxes.nil? @waiting[key] = [box] else boxes << box end end |
#waiting_remove(box) ⇒ Object
47 48 49 50 51 52 |
# File 'lib/oflow/actors/merger.rb', line 47 def waiting_remove(box) key = box_key(box) boxes = @waiting[key] # only remove the box, not a similar one or one that is == boxes.delete_if { |b| box.equal?(b) } end |