Class: Rx::JoinObserver
Instance Attribute Summary collapse
Instance Method Summary
collapse
#dispose, #fail, #on_completed, #on_error, #on_next
Methods included from Observer
allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier
Constructor Details
#initialize(source, on_error) ⇒ JoinObserver
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/rx/joins/join_observer.rb', line 5
def initialize(source, on_error)
super Observer.configure {|o|
o.on_next {|notification|
if !@is_disposed
if notification.on_error?
@on_error.call(notification.exception)
next
end
@queue.push notification
@active_plans.dup.each {|v|
v.match
}
end
}
}
@source = source
@on_error = on_error
@queue = []
@active_plans = []
@subscription = SingleAssignmentSubscription.new
@is_disposed = false
end
|
Instance Attribute Details
#queue ⇒ Object
Returns the value of attribute queue.
4
5
6
|
# File 'lib/rx/joins/join_observer.rb', line 4
def queue
@queue
end
|
Instance Method Details
#add_active_plan(active_plan) ⇒ Object
28
29
30
|
# File 'lib/rx/joins/join_observer.rb', line 28
def add_active_plan(active_plan)
@active_plans.push active_plan
end
|
#remove_active_plan(active_plan) ⇒ Object
36
37
38
39
40
41
|
# File 'lib/rx/joins/join_observer.rb', line 36
def remove_active_plan(active_plan)
if idx = @active_plans.index(active_plan)
@active_plans.delete_at idx
end
self.unsubscribe if @active_plans.length == 0
end
|
#subscribe ⇒ Object
32
33
34
|
# File 'lib/rx/joins/join_observer.rb', line 32
def subscribe
@subscription.subscription = @source.materialize.subscribe(@config)
end
|
#unsubscribe ⇒ Object
43
44
45
46
47
48
49
|
# File 'lib/rx/joins/join_observer.rb', line 43
def unsubscribe
super
if !@is_disposed
@is_disposed = true
@subscription.unsubscribe
end
end
|