Class: Rx::ActivePlan
- Inherits:
-
Object
- Object
- Rx::ActivePlan
- Defined in:
- lib/rx/joins/active_plan.rb
Instance Method Summary collapse
- #dequeue ⇒ Object
-
#initialize(join_observer_array, on_next, on_completed) ⇒ ActivePlan
constructor
A new instance of ActivePlan.
- #match ⇒ Object
Constructor Details
#initialize(join_observer_array, on_next, on_completed) ⇒ ActivePlan
Returns a new instance of ActivePlan.
3 4 5 6 7 8 9 10 11 |
# File 'lib/rx/joins/active_plan.rb', line 3 def initialize(join_observer_array, on_next, on_completed) @join_observer_array = join_observer_array @on_next = on_next @on_completed = on_completed @join_observers = {} @join_observer_array.each {|x| @join_observers[x] = x } end |
Instance Method Details
#dequeue ⇒ Object
13 14 15 |
# File 'lib/rx/joins/active_plan.rb', line 13 def dequeue @join_observers.each {|_, v| v.queue.shift } end |
#match ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rx/joins/active_plan.rb', line 17 def match has_values = true @join_observer_array.each {|v| if v.queue.length == 0 has_values = false break end } if has_values first_values = [] is_completed = false @join_observer_array.each {|v| first_values.push v.queue[0] is_completed = true if v.queue[0].on_completed? } if is_completed @on_completed.call else dequeue values = [] first_values.each {|v| values.push v.value } @on_next.call(*values) end end end |