Module: Rx::Observer

Included in:
AsyncSubject, BehaviorSubject, CheckedObserver, MockObserver, ObserverBase, ReplaySubject, Subject, Subject::AnonymousSubject
Defined in:
lib/rx/core/observer.rb,
lib/rx/core/notification.rb,
lib/rx/core/checked_observer.rb,
lib/rx/core/async_lock_observer.rb,
lib/rx/core/observe_on_observer.rb,
lib/rx/core/synchronized_observer.rb

Overview

Module for all Observers

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.allow_reentrancy(observer, gate = Monitor.new) ⇒ Object

Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a Monitor based lock. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object if given. Notice reentrant observer callbacks on the same thread are still possible.



14
15
16
# File 'lib/rx/core/synchronized_observer.rb', line 14

def allow_reentrancy(observer, gate = Monitor.new)
  SynchronizedObserver.new(observer, gate)
end

.configure {|config| ... } ⇒ Object

Configures a new instance of an Observer

Yields:

  • (config)


53
54
55
56
57
# File 'lib/rx/core/observer.rb', line 53

def configure
  config = ObserverConfiguration.new
  yield config if block_given?
  ObserverBase.new config
end

.create(on_next = nil, on_error = nil, on_completed = nil) ⇒ Object



59
60
61
62
63
64
65
# File 'lib/rx/core/observer.rb', line 59

def create(on_next = nil, on_error = nil, on_completed = nil)
  configure do |o|
    o.on_next(&on_next) if on_next
    o.on_error(&on_error) if on_error
    o.on_completed(&on_completed) if on_completed
  end
end

.from_notifierObject

Creates an observer from a notification callback.



12
13
14
15
16
17
18
19
20
# File 'lib/rx/core/notification.rb', line 12

def from_notifier
  raise ArgumentError.new 'Block required' unless block_given?

  configure do |o|
    o.on_next      {|x| yield Notification.create_on_next(x) }
    o.on_error     {|err| yield Notification.create_on_error(err) }
    o.on_completed { yield Notification.create_on_completed }
  end
end

.prevent_reentrancy(observer, gate = AsyncLock.new) ⇒ Object

Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock.



13
14
15
# File 'lib/rx/core/async_lock_observer.rb', line 13

def prevent_reentrancy(observer, gate = AsyncLock.new)
  AsyncLockObserver.new(observer, gate)
end

Instance Method Details

#as_observerObject

Hides the identity of an observer.



37
38
39
40
41
42
43
# File 'lib/rx/core/observer.rb', line 37

def as_observer
  Observer.configure do |o|
    o.on_next(&method(:on_next))
    o.on_error(&method(:on_error))
    o.on_completed(&method(:on_completed))
  end      
end

#checkedObject

Checks access to the observer for grammar violations. This includes checking for multiple on_error or on_completed calls, as well as reentrancy in any of the observer methods. If a violation is detected, an error is thrown from the offending observer method call.



11
12
13
# File 'lib/rx/core/checked_observer.rb', line 11

def checked
  CheckedObserver.new(self)
end

#notify_on(scheduler) ⇒ Object

Schedules the invocation of observer methods on the given scheduler.



10
11
12
# File 'lib/rx/core/observe_on_observer.rb', line 10

def notify_on(scheduler)
  ObserveOnObserver.new(scheduler, self, nil)
end

#to_notifierObject

Creates a notification callback from an observer.



46
47
48
# File 'lib/rx/core/observer.rb', line 46

def to_notifier
  lambda {|n| n.accept self}
end