Class: Rx::Subject
- Inherits:
-
Object
- Object
- Rx::Subject
- Includes:
- Observable, Observer
- Defined in:
- lib/rx/subjects/subject.rb,
lib/rx/subjects/subject_extensions.rb
Overview
Provides a set of static methods for creating subjects.
Defined Under Namespace
Classes: AnonymousSubject, InnerSubscription
Class Method Summary collapse
-
.create(observer, observable) ⇒ Object
Creates a subject from the specified observer and observable.
Instance Method Summary collapse
-
#has_observers? ⇒ Boolean
Indicates whether the subject has observers subscribed to it.
-
#initialize ⇒ Subject
constructor
A new instance of Subject.
-
#on_completed ⇒ Object
Notifies all subscribed observers about the end of the sequence.
-
#on_error(error) ⇒ Object
Notifies all subscribed observers with the error.
-
#on_next(value) ⇒ Object
Notifies all subscribed observers with the value.
-
#subscribe(observer) ⇒ Object
Subscribes an observer to the subject.
-
#unsubscribe ⇒ Object
Unsubscribe all observers and release resources.
Methods included from Observer
allow_reentrancy, #as_observer, #checked, configure, from_notifier, #notify_on, prevent_reentrancy, #to_notifier
Methods included from Observable
#_subscribe, #add_ref, #all?, amb, #amb, #and, #any?, #as_observable, #average, #buffer_with_count, #buffer_with_time, case, #combine_latest, combine_latest, #concat, concat, #concat_all, #concat_map, #concat_map_observer, #contains, #contains?, #count, #debounce, #default_if_empty, defer, #delay, #delay_with_selector, #dematerialize, #distinct, #distinct_until_changed, #do, #element_at, #element_at_or_default, empty, #empty?, #ensures, #enumerator_repeat_infinitely, #enumerator_repeat_times, #final, #first, #first_or_default, #flat_map, #flat_map_with_index, for, fork_join, from, from_array, generate, #group_join, if, #ignore_elements, interval, just, #last, #last_or_default, #latest, #map, #map_with_index, #materialize, #max, #max_by, #merge, #merge_all, merge_all, merge_concurrent, #merge_concurrent, #min, #min_by, #multicast, never, #none?, #observe_on, of, of_array, of_enumerable, of_enumerator, on_error_resume_next, #on_error_resume_next, pairs, #pluck, #publish, raise_error, range, #reduce, repeat, #repeat, #repeat_infinitely, repeat_infinitely, rescue_error, #rescue_error, #retry, #retry_infinitely, #scan, #select, #select_with_index, #sequence_eql?, #single, #single_or_default, #skip, #skip_last, #skip_until, #skip_while, #skip_while_with_index, start, #start_with, #subscribe_on, #subscribe_on_completed, #subscribe_on_error, #subscribe_on_next, #sum, #synchronize, #take, #take_last, #take_last_buffer, #take_until, #take_while, #take_while_with_index, #tap, #time_interval, timer, #timestamp, #to_a, to_async, #to_h, using, when, while, #window_with_count, #window_with_time, zip, #zip
Constructor Details
#initialize ⇒ Subject
Returns a new instance of Subject.
17 18 19 20 21 22 23 |
# File 'lib/rx/subjects/subject.rb', line 17 def initialize @observers = [] @gate = Mutex.new @disposed = false @stopped = false @error = nil end |
Class Method Details
.create(observer, observable) ⇒ Object
Creates a subject from the specified observer and observable.
12 13 14 |
# File 'lib/rx/subjects/subject_extensions.rb', line 12 def self.create(observer, observable) AnonymousSubject.new(observer, observable) end |
Instance Method Details
#has_observers? ⇒ Boolean
Indicates whether the subject has observers subscribed to it.
26 27 28 |
# File 'lib/rx/subjects/subject.rb', line 26 def has_observers? @observers && @observers.length > 0 end |
#on_completed ⇒ Object
Notifies all subscribed observers about the end of the sequence.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/rx/subjects/subject.rb', line 31 def on_completed os = nil @gate.synchronize do check_disposed unless @stopped os = @observers.clone @observers = [] @stopped = true end end os.each {|o| o.on_completed } if os end |
#on_error(error) ⇒ Object
Notifies all subscribed observers with the error.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rx/subjects/subject.rb', line 47 def on_error(error) raise 'error cannot be nil' unless error os = nil @gate.synchronize do check_disposed unless @stopped os = @observers.clone @observers = [] @stopped = true @error = error end end os.each {|o| o.on_error error } if os end |
#on_next(value) ⇒ Object
Notifies all subscribed observers with the value.
66 67 68 69 70 71 72 73 74 |
# File 'lib/rx/subjects/subject.rb', line 66 def on_next(value) os = nil @gate.synchronize do check_disposed os = @observers.clone unless @stopped end os.each {|o| o.on_next value } if os end |
#subscribe(observer) ⇒ Object
Subscribes an observer to the subject.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/rx/subjects/subject.rb', line 77 def subscribe(observer) raise 'observer cannot be nil' unless observer @gate.synchronize do check_disposed if !@stopped @observers.push(observer) return InnerSubscription.new(self, observer) elsif @error observer.on_error @error return Subscription.empty else observer.on_completed return Subscription.empty end end end |
#unsubscribe ⇒ Object
Unsubscribe all observers and release resources.
97 98 99 100 101 102 |
# File 'lib/rx/subjects/subject.rb', line 97 def unsubscribe @gate.synchronize do @disposed = true @observers = nil end end |