Class: OFlow::Actors::Merger

Inherits:
OFlow::Actor show all
Defined in:
lib/oflow/actors/merger.rb

Instance Attribute Summary

Attributes inherited from OFlow::Actor

#task

Instance Method Summary collapse

Methods inherited from OFlow::Actor

#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread

Constructor Details

#initialize(task, options) ⇒ Merger

Returns a new instance of Merger.



7
8
9
10
11
12
13
# File 'lib/oflow/actors/merger.rb', line 7

def initialize(task, options)
  super
  @match_key = options.fetch(:match, :tracker)
  @cnt = options.fetch(:count, 2)
  # Hash of Arrays
  @waiting = {}
end

Instance Method Details

#box_key(box) ⇒ Object



27
28
29
30
31
32
33
34
35
# File 'lib/oflow/actors/merger.rb', line 27

def box_key(box)
  key = nil
  if :tracker == @match_key
    key = box.tracker.id unless box.tracker.nil?
  elsif !@match_key.nil?
    key = box.get(@match_key)
  end
  key
end

#match(target) ⇒ Object

Should look at all the waiting boxes and determine which of those if any are a match for the target. If all necessary matches are found then an array of the boxes are returned, otherwise nil is returned.



57
58
59
60
61
62
# File 'lib/oflow/actors/merger.rb', line 57

def match(target)
  key = box_key(target)
  boxes = @waiting[key]
  return nil if boxes.nil? || (boxes.size + 1) < @cnt  
  Array.new(boxes)
end

#merge(boxes) ⇒ Object

Should merge the boxes and return a single box. The default is to take all the box contents and place them in an Array and then merge the Trackers if there are any.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/oflow/actors/merger.rb', line 67

def merge(boxes)
  content = []
  tracker = nil
  boxes.each do |b|
    content << b.contents
    unless b.tracker.nil?
      if tracker.nil?
        tracker = b.tracker
      else
        tracker = tracker.merge(b.tracker)
      end
    end
  end
  Box.new(content, tracker)
end

#perform(op, box) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/oflow/actors/merger.rb', line 15

def perform(op, box)
  matches = match(box)
  if matches.nil?
    waiting_add(box)
  else
    matches.each { |b| waiting_remove(b) }
    matches << box
    box = merge(matches)
    task.ship(nil, box)
  end
end

#waiting_add(box) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/oflow/actors/merger.rb', line 37

def waiting_add(box)
  key = box_key(box)
  boxes = @waiting[key]
  if boxes.nil?
    @waiting[key] = [box]
  else
    boxes << box
  end
end

#waiting_remove(box) ⇒ Object



47
48
49
50
51
52
# File 'lib/oflow/actors/merger.rb', line 47

def waiting_remove(box)
  key = box_key(box)
  boxes = @waiting[key]
  # only remove the box, not a similar one or one that is ==
  boxes.delete_if { |b| box.equal?(b) }
end