Class: Rx::JoinObserver
- Inherits:
-
ObserverBase
- Object
- ObserverBase
- Rx::JoinObserver
- Defined in:
- lib/rx/joins/join_observer.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #add_active_plan(active_plan) ⇒ Object
-
#initialize(source, on_error) ⇒ JoinObserver
constructor
A new instance of JoinObserver.
- #remove_active_plan(active_plan) ⇒ Object
- #subscribe ⇒ Object
- #unsubscribe ⇒ Object
Methods inherited from ObserverBase
#dispose, #fail, #on_completed, #on_error, #on_next
Methods included from Observer
allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier
Constructor Details
#initialize(source, on_error) ⇒ JoinObserver
Returns a new instance of JoinObserver.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/rx/joins/join_observer.rb', line 5 def initialize(source, on_error) super Observer.configure {|o| o.on_next {|notification| if !@is_disposed if notification.on_error? @on_error.call(notification.exception) next end @queue.push notification @active_plans.dup.each {|v| v.match } end } } @source = source @on_error = on_error @queue = [] @active_plans = [] @subscription = SingleAssignmentSubscription.new @is_disposed = false end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
4 5 6 |
# File 'lib/rx/joins/join_observer.rb', line 4 def queue @queue end |
Instance Method Details
#add_active_plan(active_plan) ⇒ Object
28 29 30 |
# File 'lib/rx/joins/join_observer.rb', line 28 def add_active_plan(active_plan) @active_plans.push active_plan end |
#remove_active_plan(active_plan) ⇒ Object
36 37 38 39 40 41 |
# File 'lib/rx/joins/join_observer.rb', line 36 def remove_active_plan(active_plan) if idx = @active_plans.index(active_plan) @active_plans.delete_at idx end self.unsubscribe if @active_plans.length == 0 end |
#subscribe ⇒ Object
32 33 34 |
# File 'lib/rx/joins/join_observer.rb', line 32 def subscribe @subscription.subscription = @source.materialize.subscribe(@config) end |
#unsubscribe ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/rx/joins/join_observer.rb', line 43 def unsubscribe super if !@is_disposed @is_disposed = true @subscription.unsubscribe end end |