Class: RxRuby::ObserveOnObserver

Inherits:
ScheduledObserver show all
Defined in:
lib/rx_ruby/core/observe_on_observer.rb

Instance Method Summary collapse

Methods inherited from ScheduledObserver

#ensure_active, #run

Methods inherited from ObserverBase

#dispose, #fail, #on_completed, #on_error, #on_next

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Constructor Details

#initialize(scheduler, observer, cancel = nil) ⇒ ObserveOnObserver

Returns a new instance of ObserveOnObserver.



17
18
19
20
21
# File 'lib/rx_ruby/core/observe_on_observer.rb', line 17

def initialize(scheduler, observer, cancel = nil)
  @cancel = cancel

  super(scheduler, observer)      
end

Instance Method Details

#on_completed_coreObject



33
34
35
36
# File 'lib/rx_ruby/core/observe_on_observer.rb', line 33

def on_completed_core
  ensure_active
  super
end

#on_error_core(error) ⇒ Object



28
29
30
31
# File 'lib/rx_ruby/core/observe_on_observer.rb', line 28

def on_error_core(error)
  ensure_active
  super(error)
end

#on_next_core(value) ⇒ Object



23
24
25
26
# File 'lib/rx_ruby/core/observe_on_observer.rb', line 23

def on_next_core(value)
  ensure_active
  super(value)
end

#unsubscribeObject



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rx_ruby/core/observe_on_observer.rb', line 38

def unsubscribe
  super

  cancel = nil
  Mutex.new.synchronize do
    cancel = @cancel
    @cancel = nil
  end

  canel.unsubscribe if cancel
end