Class: Rx::Plan
- Inherits:
-
Object
- Object
- Rx::Plan
- Defined in:
- lib/rx/joins/plan.rb
Instance Method Summary collapse
- #activate(external_subscriptions, observer, deactivate) ⇒ Object
-
#initialize(expression, selector) ⇒ Plan
constructor
A new instance of Plan.
- #plan_create_observer(external_subscriptions, observable, on_error) ⇒ Object
Constructor Details
#initialize(expression, selector) ⇒ Plan
Returns a new instance of Plan.
3 4 5 6 |
# File 'lib/rx/joins/plan.rb', line 3 def initialize(expression, selector) @expression = expression @selector = selector end |
Instance Method Details
#activate(external_subscriptions, observer, deactivate) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/rx/joins/plan.rb', line 8 def activate(external_subscriptions, observer, deactivate) join_observers = [] @expression.patterns.each {|pat| join_observers.push plan_create_observer(external_subscriptions, pat, observer.method(:on_error)) } active_plan = ActivePlan.new(join_observers, lambda {|*args| begin result = @selector.call(*args) rescue => e observer.on_error e end observer.on_next result }, lambda { join_observers.each {|v| v.remove_active_plan(active_plan) } deactivate.call(active_plan) }) join_observers.each {|v| v.add_active_plan(active_plan) } return active_plan end |
#plan_create_observer(external_subscriptions, observable, on_error) ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/rx/joins/plan.rb', line 34 def plan_create_observer(external_subscriptions, observable, on_error) entry = external_subscriptions[observable] if !entry observer = JoinObserver.new(observable, on_error) external_subscriptions[observable] = observer return observer end entry end |