Class: Rx::SynchronizedObserver

Inherits:
ObserverBase show all
Defined in:
lib/rx/core/synchronized_observer.rb

Instance Method Summary collapse

Methods inherited from ObserverBase

#dispose, #fail, #on_completed, #on_error, #on_next, #unsubscribe

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Constructor Details

#initialize(observer, gate) ⇒ SynchronizedObserver

Returns a new instance of SynchronizedObserver.



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rx/core/synchronized_observer.rb', line 34

def initialize(observer, gate)
  @observer = observer
  @gate = gate

  config = ObserverConfiguration.new
  config.on_next(&method(:on_next_core))
  config.on_error(&method(:on_error_core))
  config.on_completed(&method(:on_completed_core))

  super(config)
end

Instance Method Details

#on_completed_coreObject



30
31
32
# File 'lib/rx/core/synchronized_observer.rb', line 30

def on_completed_core
  @gate.synchronize { @observer.on_completed }
end

#on_error_core(error) ⇒ Object



26
27
28
# File 'lib/rx/core/synchronized_observer.rb', line 26

def on_error_core(error)
  @gate.synchronize { @observer.on_error error }
end

#on_next_core(value) ⇒ Object



22
23
24
# File 'lib/rx/core/synchronized_observer.rb', line 22

def on_next_core(value)
  @gate.synchronize { @observer.on_next value }
end