Class: Rx::BehaviorSubject
- Inherits:
-
Object
- Object
- Rx::BehaviorSubject
- Includes:
- Observable, Observer
- Defined in:
- lib/rx/subjects/behavior_subject.rb
Overview
Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
Defined Under Namespace
Classes: InnerSubscription
Instance Attribute Summary collapse
-
#gate ⇒ Object
readonly
Returns the value of attribute gate.
-
#observers ⇒ Object
readonly
Returns the value of attribute observers.
-
#unsubscribed ⇒ Object
readonly
Returns the value of attribute unsubscribed.
Instance Method Summary collapse
-
#has_observers? ⇒ Boolean
Indicates whether the subject has observers subscribed to it.
-
#initialize(value) ⇒ BehaviorSubject
constructor
A new instance of BehaviorSubject.
-
#on_completed ⇒ Object
Notifies all subscribed observers about the end of the sequence.
-
#on_error(error) ⇒ Object
Notifies all subscribed observers with the error.
-
#on_next(value) ⇒ Object
Notifies all subscribed observers with the value.
-
#subscribe(observer) ⇒ Object
Subscribes an observer to the subject.
-
#unsubscribe ⇒ Object
Unsubscribe all observers and release resources.
-
#value ⇒ Object
Gets the current value or throws an exception.
Methods included from Observer
allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier
Methods included from Observable
#_subscribe, #add_ref, #all?, #amb, amb, #and, #any?, #as_observable, #average, #buffer_with_count, #buffer_with_time, case, combine_latest, #combine_latest, #concat, concat, #concat_all, #concat_map, #concat_map_observer, #contains, #contains?, #count, create, #debounce, #default_if_empty, defer, #delay, #delay_with_selector, #dematerialize, #distinct, #distinct_until_changed, #do, #element_at, #element_at_or_default, empty, #empty?, #ensures, #enumerator_repeat_infinitely, #enumerator_repeat_times, #final, #first, #first_or_default, #flat_map, #flat_map_with_index, for, fork_join, from, from_array, generate, #group_join, if, #ignore_elements, interval, just, #last, #last_or_default, #latest, #map, #map_with_index, #materialize, #max, #max_by, #merge, #merge_all, merge_all, merge_concurrent, #merge_concurrent, #min, #min_by, #multicast, never, #none?, #observe_on, of, of_array, of_enumerable, of_enumerator, on_error_resume_next, #on_error_resume_next, pairs, #pluck, #publish, raise_error, range, #reduce, repeat, #repeat, repeat_infinitely, #repeat_infinitely, rescue_error, #rescue_error, #retry, #retry_infinitely, #scan, #select, #select_with_index, #sequence_eql?, #single, #single_or_default, #skip, #skip_last, #skip_until, #skip_while, #skip_while_with_index, start, #start_with, #subscribe_on, #subscribe_on_completed, #subscribe_on_error, #subscribe_on_next, #sum, #synchronize, #take, #take_last, #take_last_buffer, #take_until, #take_while, #take_while_with_index, #tap, #time_interval, timer, #timestamp, #to_a, to_async, #to_h, using, when, while, #window_with_count, #window_with_time, #zip, zip
Constructor Details
#initialize(value) ⇒ BehaviorSubject
Returns a new instance of BehaviorSubject.
18 19 20 21 22 23 24 25 |
# File 'lib/rx/subjects/behavior_subject.rb', line 18 def initialize(value) @value = value @observers = [] @gate = Mutex.new @unsubscribed = false @stopped = false @error = nil end |
Instance Attribute Details
#gate ⇒ Object (readonly)
Returns the value of attribute gate.
16 17 18 |
# File 'lib/rx/subjects/behavior_subject.rb', line 16 def gate @gate end |
#observers ⇒ Object (readonly)
Returns the value of attribute observers.
16 17 18 |
# File 'lib/rx/subjects/behavior_subject.rb', line 16 def observers @observers end |
#unsubscribed ⇒ Object (readonly)
Returns the value of attribute unsubscribed.
16 17 18 |
# File 'lib/rx/subjects/behavior_subject.rb', line 16 def unsubscribed @unsubscribed end |
Instance Method Details
#has_observers? ⇒ Boolean
Indicates whether the subject has observers subscribed to it.
28 29 30 |
# File 'lib/rx/subjects/behavior_subject.rb', line 28 def has_observers? observers && observers.length > 0 end |
#on_completed ⇒ Object
Notifies all subscribed observers about the end of the sequence.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rx/subjects/behavior_subject.rb', line 42 def on_completed os = nil @gate.synchronize do self.check_unsubscribed unless @stopped os = @observers.clone @observers = [] @stopped = true end end os.each {|o| observer.on_completed } if os end |
#on_error(error) ⇒ Object
Notifies all subscribed observers with the error.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rx/subjects/behavior_subject.rb', line 58 def on_error(error) raise 'error cannot be nil' unless error os = nil @gate.synchronize do self.check_unsubscribed unless @stopped os = @observers.clone @observers = [] @stopped = true @error = error end end os.each {|o| observer.on_error error } if os end |
#on_next(value) ⇒ Object
Notifies all subscribed observers with the value.
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/rx/subjects/behavior_subject.rb', line 77 def on_next(value) os = nil @gate.synchronize do self.check_unsubscribed @value = value os = @observers.clone unless @stopped end os.each {|o| o.on_next value } if os end |
#subscribe(observer) ⇒ Object
Subscribes an observer to the subject.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/rx/subjects/behavior_subject.rb', line 89 def subscribe(observer) raise 'observer cannot be nil' unless observer err = nil gate.synchronize do self.check_unsubscribed unless @stopped observers.push(observer) observer.on_next(@value) return InnerSubscription.new(self, observer) end err = @error end if err observer.on_next err else observer.on_completed end Subscription.empty end |
#unsubscribe ⇒ Object
Unsubscribe all observers and release resources.
115 116 117 118 119 120 121 122 |
# File 'lib/rx/subjects/behavior_subject.rb', line 115 def unsubscribe gate.synchronize do @unsubscribed = true @observers = nil @error = nil @value = nil end end |
#value ⇒ Object
Gets the current value or throws an exception.
33 34 35 36 37 38 39 |
# File 'lib/rx/subjects/behavior_subject.rb', line 33 def value gate.synchronize do self.check_unsubscribed raise @error if @error @value end end |