Class: Rx::ActivePlan

Inherits:
Object
  • Object
show all
Defined in:
lib/rx/joins/active_plan.rb

Instance Method Summary collapse

Constructor Details

#initialize(join_observer_array, on_next, on_completed) ⇒ ActivePlan

Returns a new instance of ActivePlan.



3
4
5
6
7
8
9
10
11
# File 'lib/rx/joins/active_plan.rb', line 3

def initialize(join_observer_array, on_next, on_completed)
  @join_observer_array = join_observer_array
  @on_next = on_next
  @on_completed = on_completed
  @join_observers = {}
  @join_observer_array.each {|x|
    @join_observers[x] = x
  }
end

Instance Method Details

#dequeueObject



13
14
15
# File 'lib/rx/joins/active_plan.rb', line 13

def dequeue
  @join_observers.each {|_, v| v.queue.shift }
end

#matchObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/rx/joins/active_plan.rb', line 17

def match
  has_values = true
  @join_observer_array.each {|v|
    if v.queue.length == 0
      has_values = false
      break
    end
  }
  if has_values
    first_values = []
    is_completed = false
    @join_observer_array.each {|v|
      first_values.push v.queue[0]
      is_completed = true if v.queue[0].on_completed?
    }
    if is_completed
      @on_completed.call
    else
      dequeue
      values = []
      first_values.each {|v|
        values.push v.value
      }
      @on_next.call(*values)
    end
  end
end