Class: Rx::BehaviorSubject

Inherits:
Object
  • Object
show all
Includes:
Observable, Observer
Defined in:
lib/rx/subjects/behavior_subject.rb

Overview

Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.

Defined Under Namespace

Classes: InnerSubscription

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Methods included from Observable

#_subscribe, #add_ref, #all?, #amb, amb, #and, #any?, #as_observable, #average, #buffer_with_count, #buffer_with_time, case, combine_latest, #combine_latest, #concat, concat, #concat_all, #concat_map, #concat_map_observer, #contains, #contains?, #count, create, #debounce, #default_if_empty, defer, #delay, #delay_with_selector, #dematerialize, #distinct, #distinct_until_changed, #do, #element_at, #element_at_or_default, empty, #empty?, #ensures, #enumerator_repeat_infinitely, #enumerator_repeat_times, #final, #first, #first_or_default, #flat_map, #flat_map_with_index, for, fork_join, from, from_array, generate, #group_join, if, #ignore_elements, interval, just, #last, #last_or_default, #latest, #map, #map_with_index, #materialize, #max, #max_by, #merge, #merge_all, merge_all, merge_concurrent, #merge_concurrent, #min, #min_by, #multicast, never, #none?, #observe_on, of, of_array, of_enumerable, of_enumerator, on_error_resume_next, #on_error_resume_next, pairs, #pluck, #publish, raise_error, range, #reduce, repeat, #repeat, repeat_infinitely, #repeat_infinitely, rescue_error, #rescue_error, #retry, #retry_infinitely, #scan, #select, #select_with_index, #sequence_eql?, #single, #single_or_default, #skip, #skip_last, #skip_until, #skip_while, #skip_while_with_index, start, #start_with, #subscribe_on, #subscribe_on_completed, #subscribe_on_error, #subscribe_on_next, #sum, #synchronize, #take, #take_last, #take_last_buffer, #take_until, #take_while, #take_while_with_index, #tap, #time_interval, timer, #timestamp, #to_a, to_async, #to_h, using, when, while, #window_with_count, #window_with_time, #zip, zip

Constructor Details

#initialize(value) ⇒ BehaviorSubject

Returns a new instance of BehaviorSubject.



18
19
20
21
22
23
24
25
# File 'lib/rx/subjects/behavior_subject.rb', line 18

def initialize(value)
  @value = value
  @observers = []
  @gate = Mutex.new
  @unsubscribed = false
  @stopped = false
  @error = nil
end

Instance Attribute Details

#gateObject (readonly)

Returns the value of attribute gate.



16
17
18
# File 'lib/rx/subjects/behavior_subject.rb', line 16

def gate
  @gate
end

#observersObject (readonly)

Returns the value of attribute observers.



16
17
18
# File 'lib/rx/subjects/behavior_subject.rb', line 16

def observers
  @observers
end

#unsubscribedObject (readonly)

Returns the value of attribute unsubscribed.



16
17
18
# File 'lib/rx/subjects/behavior_subject.rb', line 16

def unsubscribed
  @unsubscribed
end

Instance Method Details

#has_observers?Boolean

Indicates whether the subject has observers subscribed to it.

Returns:

  • (Boolean)


28
29
30
# File 'lib/rx/subjects/behavior_subject.rb', line 28

def has_observers?
  observers && observers.length > 0
end

#on_completedObject

Notifies all subscribed observers about the end of the sequence.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rx/subjects/behavior_subject.rb', line 42

def on_completed
  os = nil
  @gate.synchronize do 
    self.check_unsubscribed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
    end 
  end

  os.each {|o| observer.on_completed } if os
end

#on_error(error) ⇒ Object

Notifies all subscribed observers with the error.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rx/subjects/behavior_subject.rb', line 58

def on_error(error)
  raise 'error cannot be nil' unless error

  os = nil
  @gate.synchronize do
    self.check_unsubscribed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
      @error = error
    end         
  end

  os.each {|o| observer.on_error error } if os
end

#on_next(value) ⇒ Object

Notifies all subscribed observers with the value.



77
78
79
80
81
82
83
84
85
86
# File 'lib/rx/subjects/behavior_subject.rb', line 77

def on_next(value) 
  os = nil
  @gate.synchronize do
    self.check_unsubscribed
    @value = value
    os = @observers.clone unless @stopped
  end

  os.each {|o| o.on_next value } if os
end

#subscribe(observer) ⇒ Object

Subscribes an observer to the subject.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/rx/subjects/behavior_subject.rb', line 89

def subscribe(observer)
  raise 'observer cannot be nil' unless observer

  err = nil
  gate.synchronize do
    self.check_unsubscribed

    unless @stopped
      observers.push(observer)
      observer.on_next(@value)
      return InnerSubscription.new(self, observer)
    end

    err = @error
  end

  if err
    observer.on_next err
  else
    observer.on_completed
  end

  Subscription.empty
end

#unsubscribeObject

Unsubscribe all observers and release resources.



115
116
117
118
119
120
121
122
# File 'lib/rx/subjects/behavior_subject.rb', line 115

def unsubscribe
  gate.synchronize do
    @unsubscribed = true
    @observers = nil
    @error = nil
    @value = nil
  end
end

#valueObject

Gets the current value or throws an exception.



33
34
35
36
37
38
39
# File 'lib/rx/subjects/behavior_subject.rb', line 33

def value
  gate.synchronize do 
    self.check_unsubscribed
    raise @error if @error
    @value
  end
end