Class: Rx::JoinObserver

Inherits:
ObserverBase show all
Defined in:
lib/rx/joins/join_observer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from ObserverBase

#dispose, #fail, #on_completed, #on_error, #on_next

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Constructor Details

#initialize(source, on_error) ⇒ JoinObserver

Returns a new instance of JoinObserver.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/rx/joins/join_observer.rb', line 5

def initialize(source, on_error)
  super Observer.configure {|o|
    o.on_next {|notification|
      if !@is_disposed
        if notification.on_error?
          @on_error.call(notification.exception)
          next
        end
        @queue.push notification
        @active_plans.dup.each {|v|
          v.match
        }
      end
    }
  }
  @source = source
  @on_error = on_error
  @queue = []
  @active_plans = []
  @subscription = SingleAssignmentSubscription.new
  @is_disposed = false
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



4
5
6
# File 'lib/rx/joins/join_observer.rb', line 4

def queue
  @queue
end

Instance Method Details

#add_active_plan(active_plan) ⇒ Object



28
29
30
# File 'lib/rx/joins/join_observer.rb', line 28

def add_active_plan(active_plan)
  @active_plans.push active_plan
end

#remove_active_plan(active_plan) ⇒ Object



36
37
38
39
40
41
# File 'lib/rx/joins/join_observer.rb', line 36

def remove_active_plan(active_plan)
  if idx = @active_plans.index(active_plan)
    @active_plans.delete_at idx
  end
  self.unsubscribe if @active_plans.length == 0
end

#subscribeObject



32
33
34
# File 'lib/rx/joins/join_observer.rb', line 32

def subscribe
  @subscription.subscription = @source.materialize.subscribe(@config)
end

#unsubscribeObject



43
44
45
46
47
48
49
# File 'lib/rx/joins/join_observer.rb', line 43

def unsubscribe
  super
  if !@is_disposed
    @is_disposed = true
    @subscription.unsubscribe
  end
end