Class: Rx::ObserveOnObserver
Instance Method Summary
collapse
#ensure_active, #run
#dispose, #fail, #on_completed, #on_error, #on_next
Methods included from Observer
allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier
Constructor Details
#initialize(scheduler, observer, cancel = nil) ⇒ ObserveOnObserver
Returns a new instance of ObserveOnObserver.
17
18
19
20
21
|
# File 'lib/rx/core/observe_on_observer.rb', line 17
def initialize(scheduler, observer, cancel = nil)
@cancel = cancel
super(scheduler, observer)
end
|
Instance Method Details
#on_completed_core ⇒ Object
33
34
35
36
|
# File 'lib/rx/core/observe_on_observer.rb', line 33
def on_completed_core
ensure_active
super
end
|
#on_error_core(error) ⇒ Object
28
29
30
31
|
# File 'lib/rx/core/observe_on_observer.rb', line 28
def on_error_core(error)
ensure_active
super(error)
end
|
#on_next_core(value) ⇒ Object
23
24
25
26
|
# File 'lib/rx/core/observe_on_observer.rb', line 23
def on_next_core(value)
ensure_active
super(value)
end
|
#unsubscribe ⇒ Object
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/rx/core/observe_on_observer.rb', line 38
def unsubscribe
super
cancel = nil
Mutex.new.synchronize do
cancel = @cancel
@cancel = nil
end
canel.unsubscribe if cancel
end
|