Class: Rx::AsyncSubject
- Inherits:
-
Object
- Object
- Rx::AsyncSubject
- Includes:
- Observable, Observer
- Defined in:
- lib/rx/subjects/async_subject.rb
Overview
Represents the result of an asynchronous operation. Each notification is broadcasted to all subscribed observers.
Defined Under Namespace
Classes: InnerSubscription
Instance Attribute Summary collapse
-
#gate ⇒ Object
readonly
Returns the value of attribute gate.
-
#observers ⇒ Object
readonly
Returns the value of attribute observers.
-
#unsubscribed ⇒ Object
readonly
Returns the value of attribute unsubscribed.
Instance Method Summary collapse
-
#has_observers? ⇒ Boolean
Indicates whether the subject has observers subscribed to it.
-
#initialize ⇒ AsyncSubject
constructor
A new instance of AsyncSubject.
-
#on_completed ⇒ Object
Notifies all subscribed observers about the end of the sequence.
-
#on_error(error) ⇒ Object
Notifies all subscribed observers with the error.
-
#on_next(value) ⇒ Object
Notifies all subscribed observers with the value.
-
#subscribe(observer) ⇒ Object
Subscribes an observer to the subject.
-
#unsubscribe ⇒ Object
Unsubscribe all observers and release resources.
Methods included from Observer
allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier
Methods included from Observable
#_subscribe, #add_ref, #all?, #amb, amb, #and, #any?, #as_observable, #average, #buffer_with_count, #buffer_with_time, case, combine_latest, #combine_latest, #concat, concat, #concat_all, #concat_map, #concat_map_observer, #contains, #contains?, #count, create, #debounce, #default_if_empty, defer, #delay, #delay_with_selector, #dematerialize, #distinct, #distinct_until_changed, #do, #element_at, #element_at_or_default, empty, #empty?, #ensures, #enumerator_repeat_infinitely, #enumerator_repeat_times, #final, #first, #first_or_default, #flat_map, #flat_map_with_index, for, fork_join, from, from_array, generate, #group_join, if, #ignore_elements, interval, just, #last, #last_or_default, #latest, #map, #map_with_index, #materialize, #max, #max_by, #merge, #merge_all, merge_all, merge_concurrent, #merge_concurrent, #min, #min_by, #multicast, never, #none?, #observe_on, of, of_array, of_enumerable, of_enumerator, on_error_resume_next, #on_error_resume_next, pairs, #pluck, #publish, raise_error, range, #reduce, repeat, #repeat, repeat_infinitely, #repeat_infinitely, rescue_error, #rescue_error, #retry, #retry_infinitely, #scan, #select, #select_with_index, #sequence_eql?, #single, #single_or_default, #skip, #skip_last, #skip_until, #skip_while, #skip_while_with_index, start, #start_with, #subscribe_on, #subscribe_on_completed, #subscribe_on_error, #subscribe_on_next, #sum, #synchronize, #take, #take_last, #take_last_buffer, #take_until, #take_while, #take_while_with_index, #tap, #time_interval, timer, #timestamp, #to_a, to_async, #to_h, using, when, while, #window_with_count, #window_with_time, #zip, zip
Constructor Details
#initialize ⇒ AsyncSubject
Returns a new instance of AsyncSubject.
19 20 21 22 23 24 25 26 27 |
# File 'lib/rx/subjects/async_subject.rb', line 19 def initialize @observers = [] @gate = Mutex.new @unsubscribed = false @stopped = false @error = nil @value = nil @has_value = false end |
Instance Attribute Details
#gate ⇒ Object (readonly)
Returns the value of attribute gate.
17 18 19 |
# File 'lib/rx/subjects/async_subject.rb', line 17 def gate @gate end |
#observers ⇒ Object (readonly)
Returns the value of attribute observers.
17 18 19 |
# File 'lib/rx/subjects/async_subject.rb', line 17 def observers @observers end |
#unsubscribed ⇒ Object (readonly)
Returns the value of attribute unsubscribed.
17 18 19 |
# File 'lib/rx/subjects/async_subject.rb', line 17 def unsubscribed @unsubscribed end |
Instance Method Details
#has_observers? ⇒ Boolean
Indicates whether the subject has observers subscribed to it.
30 31 32 |
# File 'lib/rx/subjects/async_subject.rb', line 30 def has_observers? observers && observers.length > 0 end |
#on_completed ⇒ Object
Notifies all subscribed observers about the end of the sequence.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/rx/subjects/async_subject.rb', line 35 def on_completed os = nil v = nil hv = false gate.synchronize do check_unsubscribed unless @stopped os = @observers.clone @observers = [] @stopped = true v = @value hv = @has_value end end if os if hv os.each do |o| o.on_next @value o.on_completed end else os.each {|o| o.on_completed } end end end |
#on_error(error) ⇒ Object
Notifies all subscribed observers with the error.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/rx/subjects/async_subject.rb', line 65 def on_error(error) raise 'error cannot be nil' unless error os = nil gate.synchronize do check_unsubscribed unless @stopped os = observers.clone @observers = [] @stopped = true @error = error end end os.each {|o| o.on_error error } if os end |
#on_next(value) ⇒ Object
Notifies all subscribed observers with the value.
84 85 86 87 88 89 90 91 92 |
# File 'lib/rx/subjects/async_subject.rb', line 84 def on_next(value) gate.synchronize do check_unsubscribed unless @stopped @value = value @has_value = true end end end |
#subscribe(observer) ⇒ Object
Subscribes an observer to the subject.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/rx/subjects/async_subject.rb', line 95 def subscribe(observer) raise 'observer cannot be nil' unless observer err = nil v = nil hv = false gate.synchronize do check_unsubscribed if !@stopped observers.push(observer) return InnerSubscription.new(self, observer) end err = @error v = @value hv = @has_value end if err observer.on_next err elsif hv observer.on_next v observer.on_completed else observer.on_completed end Subscription.empty end |
#unsubscribe ⇒ Object
Unsubscribe all observers and release resources.
128 129 130 131 132 133 134 135 |
# File 'lib/rx/subjects/async_subject.rb', line 128 def unsubscribe gate.synchronize do @unsubscribed = true @observers = nil @error = nil @value = nil end end |