Class: Rx::Plan

Inherits:
Object
  • Object
show all
Defined in:
lib/rx/joins/plan.rb

Instance Method Summary collapse

Constructor Details

#initialize(expression, selector) ⇒ Plan

Returns a new instance of Plan.



3
4
5
6
# File 'lib/rx/joins/plan.rb', line 3

def initialize(expression, selector)
  @expression = expression
  @selector = selector
end

Instance Method Details

#activate(external_subscriptions, observer, deactivate) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/rx/joins/plan.rb', line 8

def activate(external_subscriptions, observer, deactivate)
  join_observers = []
  @expression.patterns.each {|pat|
    join_observers.push plan_create_observer(external_subscriptions, pat, observer.method(:on_error))
  }

  active_plan = ActivePlan.new(join_observers, lambda {|*args|
    begin
      result = @selector.call(*args)
    rescue => e
      observer.on_error e
    end
    observer.on_next result
  },
  lambda {
    join_observers.each {|v|
      v.remove_active_plan(active_plan)
    }
    deactivate.call(active_plan)
  })
  join_observers.each {|v|
    v.add_active_plan(active_plan)
  }
  return active_plan
end

#plan_create_observer(external_subscriptions, observable, on_error) ⇒ Object



34
35
36
37
38
39
40
41
42
# File 'lib/rx/joins/plan.rb', line 34

def plan_create_observer(external_subscriptions, observable, on_error)
  entry = external_subscriptions[observable]
  if !entry
    observer = JoinObserver.new(observable, on_error)
    external_subscriptions[observable] = observer
    return observer
  end
  entry
end