Module: Rx::Observer
- Included in:
- AsyncSubject, BehaviorSubject, CheckedObserver, MockObserver, ObserverBase, ReplaySubject, Subject, Subject::AnonymousSubject
- Defined in:
- lib/rx/core/observer.rb,
lib/rx/core/notification.rb,
lib/rx/core/checked_observer.rb,
lib/rx/core/async_lock_observer.rb,
lib/rx/core/observe_on_observer.rb,
lib/rx/core/synchronized_observer.rb
Overview
Module for all Observers
Class Method Summary collapse
-
.allow_reentrancy(observer, gate = Monitor.new) ⇒ Object
Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a Monitor based lock.
-
.configure {|config| ... } ⇒ Object
Configures a new instance of an Observer.
- .create(on_next = nil, on_error = nil, on_completed = nil) ⇒ Object
-
.from_notifier ⇒ Object
Creates an observer from a notification callback.
-
.prevent_reentrancy(observer, gate = AsyncLock.new) ⇒ Object
Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access.
Instance Method Summary collapse
-
#as_observer ⇒ Object
Hides the identity of an observer.
-
#checked ⇒ Object
Checks access to the observer for grammar violations.
-
#notify_on(scheduler) ⇒ Object
Schedules the invocation of observer methods on the given scheduler.
-
#to_notifier ⇒ Object
Creates a notification callback from an observer.
Class Method Details
.allow_reentrancy(observer, gate = Monitor.new) ⇒ Object
Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a Monitor based lock. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object if given. Notice reentrant observer callbacks on the same thread are still possible.
14 15 16 |
# File 'lib/rx/core/synchronized_observer.rb', line 14 def allow_reentrancy(observer, gate = Monitor.new) SynchronizedObserver.new(observer, gate) end |
.configure {|config| ... } ⇒ Object
Configures a new instance of an Observer
53 54 55 56 57 |
# File 'lib/rx/core/observer.rb', line 53 def configure config = ObserverConfiguration.new yield config if block_given? ObserverBase.new config end |
.create(on_next = nil, on_error = nil, on_completed = nil) ⇒ Object
59 60 61 62 63 64 65 |
# File 'lib/rx/core/observer.rb', line 59 def create(on_next = nil, on_error = nil, on_completed = nil) configure do |o| o.on_next(&on_next) if on_next o.on_error(&on_error) if on_error o.on_completed(&on_completed) if on_completed end end |
.from_notifier ⇒ Object
Creates an observer from a notification callback.
12 13 14 15 16 17 18 19 20 |
# File 'lib/rx/core/notification.rb', line 12 def from_notifier raise ArgumentError.new 'Block required' unless block_given? configure do |o| o.on_next {|x| yield Notification.create_on_next(x) } o.on_error {|err| yield Notification.create_on_error(err) } o.on_completed { yield Notification.create_on_completed } end end |
.prevent_reentrancy(observer, gate = AsyncLock.new) ⇒ Object
Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock.
13 14 15 |
# File 'lib/rx/core/async_lock_observer.rb', line 13 def prevent_reentrancy(observer, gate = AsyncLock.new) AsyncLockObserver.new(observer, gate) end |
Instance Method Details
#as_observer ⇒ Object
Hides the identity of an observer.
37 38 39 40 41 42 43 |
# File 'lib/rx/core/observer.rb', line 37 def as_observer Observer.configure do |o| o.on_next(&method(:on_next)) o.on_error(&method(:on_error)) o.on_completed(&method(:on_completed)) end end |
#checked ⇒ Object
Checks access to the observer for grammar violations. This includes checking for multiple on_error or on_completed calls, as well as reentrancy in any of the observer methods. If a violation is detected, an error is thrown from the offending observer method call.
11 12 13 |
# File 'lib/rx/core/checked_observer.rb', line 11 def checked CheckedObserver.new(self) end |
#notify_on(scheduler) ⇒ Object
Schedules the invocation of observer methods on the given scheduler.
10 11 12 |
# File 'lib/rx/core/observe_on_observer.rb', line 10 def notify_on(scheduler) ObserveOnObserver.new(scheduler, self, nil) end |
#to_notifier ⇒ Object
Creates a notification callback from an observer.
46 47 48 |
# File 'lib/rx/core/observer.rb', line 46 def to_notifier lambda {|n| n.accept self} end |