Class: Rx::AsyncSubject

Inherits:
Object
  • Object
show all
Includes:
Observable, Observer
Defined in:
lib/rx/subjects/async_subject.rb

Overview

Represents the result of an asynchronous operation. Each notification is broadcasted to all subscribed observers.

Defined Under Namespace

Classes: InnerSubscription

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Methods included from Observable

#_subscribe, #add_ref, #all?, #amb, amb, #and, #any?, #as_observable, #average, #buffer_with_count, #buffer_with_time, case, combine_latest, #combine_latest, #concat, concat, #concat_all, #concat_map, #concat_map_observer, #contains, #contains?, #count, create, #debounce, #default_if_empty, defer, #delay, #delay_with_selector, #dematerialize, #distinct, #distinct_until_changed, #do, #element_at, #element_at_or_default, empty, #empty?, #ensures, #enumerator_repeat_infinitely, #enumerator_repeat_times, #final, #first, #first_or_default, #flat_map, #flat_map_with_index, for, fork_join, from, from_array, generate, #group_join, if, #ignore_elements, interval, just, #last, #last_or_default, #latest, #map, #map_with_index, #materialize, #max, #max_by, #merge, #merge_all, merge_all, merge_concurrent, #merge_concurrent, #min, #min_by, #multicast, never, #none?, #observe_on, of, of_array, of_enumerable, of_enumerator, on_error_resume_next, #on_error_resume_next, pairs, #pluck, #publish, raise_error, range, #reduce, repeat, #repeat, repeat_infinitely, #repeat_infinitely, rescue_error, #rescue_error, #retry, #retry_infinitely, #scan, #select, #select_with_index, #sequence_eql?, #single, #single_or_default, #skip, #skip_last, #skip_until, #skip_while, #skip_while_with_index, start, #start_with, #subscribe_on, #subscribe_on_completed, #subscribe_on_error, #subscribe_on_next, #sum, #synchronize, #take, #take_last, #take_last_buffer, #take_until, #take_while, #take_while_with_index, #tap, #time_interval, timer, #timestamp, #to_a, to_async, #to_h, using, when, while, #window_with_count, #window_with_time, #zip, zip

Constructor Details

#initializeAsyncSubject

Returns a new instance of AsyncSubject.



19
20
21
22
23
24
25
26
27
# File 'lib/rx/subjects/async_subject.rb', line 19

def initialize
  @observers = []
  @gate = Mutex.new
  @unsubscribed = false
  @stopped = false
  @error = nil
  @value = nil
  @has_value = false
end

Instance Attribute Details

#gateObject (readonly)

Returns the value of attribute gate.



17
18
19
# File 'lib/rx/subjects/async_subject.rb', line 17

def gate
  @gate
end

#observersObject (readonly)

Returns the value of attribute observers.



17
18
19
# File 'lib/rx/subjects/async_subject.rb', line 17

def observers
  @observers
end

#unsubscribedObject (readonly)

Returns the value of attribute unsubscribed.



17
18
19
# File 'lib/rx/subjects/async_subject.rb', line 17

def unsubscribed
  @unsubscribed
end

Instance Method Details

#has_observers?Boolean

Indicates whether the subject has observers subscribed to it.

Returns:

  • (Boolean)


30
31
32
# File 'lib/rx/subjects/async_subject.rb', line 30

def has_observers?
  observers && observers.length > 0
end

#on_completedObject

Notifies all subscribed observers about the end of the sequence.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/rx/subjects/async_subject.rb', line 35

def on_completed
  os = nil
  v = nil
  hv = false

  gate.synchronize do
    check_unsubscribed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
      v = @value
      hv = @has_value
    end
  end

  if os
    if hv
      os.each do |o|
        o.on_next @value
        o.on_completed
      end
    else
      os.each {|o| o.on_completed }
    end
  end
end

#on_error(error) ⇒ Object

Notifies all subscribed observers with the error.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/rx/subjects/async_subject.rb', line 65

def on_error(error)
  raise 'error cannot be nil' unless error

  os = nil
  gate.synchronize do
    check_unsubscribed

    unless @stopped
      os = observers.clone
      @observers = []
      @stopped = true
      @error = error
    end
  end

  os.each {|o| o.on_error error } if os
end

#on_next(value) ⇒ Object

Notifies all subscribed observers with the value.



84
85
86
87
88
89
90
91
92
# File 'lib/rx/subjects/async_subject.rb', line 84

def on_next(value)
  gate.synchronize do
    check_unsubscribed
    unless @stopped
      @value = value
      @has_value = true
    end
  end
end

#subscribe(observer) ⇒ Object

Subscribes an observer to the subject.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/rx/subjects/async_subject.rb', line 95

def subscribe(observer)
  raise 'observer cannot be nil' unless observer

  err = nil
  v = nil
  hv = false

  gate.synchronize do
    check_unsubscribed

    if !@stopped
      observers.push(observer)
      return InnerSubscription.new(self, observer)
    end

    err = @error
    v = @value
    hv = @has_value
  end

  if err
    observer.on_next err
  elsif hv
    observer.on_next v
    observer.on_completed
  else
    observer.on_completed
  end

  Subscription.empty
end

#unsubscribeObject

Unsubscribe all observers and release resources.



128
129
130
131
132
133
134
135
# File 'lib/rx/subjects/async_subject.rb', line 128

def unsubscribe
  gate.synchronize do
    @unsubscribed = true
    @observers = nil
    @error = nil
    @value = nil
  end
end