Class: Rx::Subject

Inherits:
Object
  • Object
show all
Includes:
Observable, Observer
Defined in:
lib/rx/subjects/subject.rb,
lib/rx/subjects/subject_extensions.rb

Overview

Provides a set of static methods for creating subjects.

Defined Under Namespace

Classes: AnonymousSubject, InnerSubscription

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Methods included from Observable

#_subscribe, #add_ref, #all?, amb, #amb, #and, #any?, #as_observable, #average, #buffer_with_count, #buffer_with_time, case, #combine_latest, combine_latest, #concat, concat, #concat_all, #concat_map, #concat_map_observer, #contains, #contains?, #count, #debounce, #default_if_empty, defer, #delay, #delay_with_selector, #dematerialize, #distinct, #distinct_until_changed, #do, #element_at, #element_at_or_default, empty, #empty?, #ensures, #enumerator_repeat_infinitely, #enumerator_repeat_times, #final, #first, #first_or_default, #flat_map, #flat_map_with_index, for, fork_join, from, from_array, generate, #group_join, if, #ignore_elements, interval, just, #last, #last_or_default, #latest, #map, #map_with_index, #materialize, #max, #max_by, #merge, #merge_all, merge_all, merge_concurrent, #merge_concurrent, #min, #min_by, #multicast, never, #none?, #observe_on, of, of_array, of_enumerable, of_enumerator, on_error_resume_next, #on_error_resume_next, pairs, #pluck, #publish, raise_error, range, #reduce, repeat, #repeat, #repeat_infinitely, repeat_infinitely, rescue_error, #rescue_error, #retry, #retry_infinitely, #scan, #select, #select_with_index, #sequence_eql?, #single, #single_or_default, #skip, #skip_last, #skip_until, #skip_while, #skip_while_with_index, start, #start_with, #subscribe_on, #subscribe_on_completed, #subscribe_on_error, #subscribe_on_next, #sum, #synchronize, #take, #take_last, #take_last_buffer, #take_until, #take_while, #take_while_with_index, #tap, #time_interval, timer, #timestamp, #to_a, to_async, #to_h, using, when, while, #window_with_count, #window_with_time, zip, #zip

Constructor Details

#initializeSubject

Returns a new instance of Subject.



17
18
19
20
21
22
23
# File 'lib/rx/subjects/subject.rb', line 17

def initialize
  @observers = []
  @gate = Mutex.new
  @disposed = false
  @stopped = false
  @error = nil
end

Class Method Details

.create(observer, observable) ⇒ Object

Creates a subject from the specified observer and observable.



12
13
14
# File 'lib/rx/subjects/subject_extensions.rb', line 12

def self.create(observer, observable)
  AnonymousSubject.new(observer, observable)
end

Instance Method Details

#has_observers?Boolean

Indicates whether the subject has observers subscribed to it.

Returns:

  • (Boolean)


26
27
28
# File 'lib/rx/subjects/subject.rb', line 26

def has_observers?
  @observers && @observers.length > 0
end

#on_completedObject

Notifies all subscribed observers about the end of the sequence.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/rx/subjects/subject.rb', line 31

def on_completed
  os = nil
  @gate.synchronize do 
    check_disposed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
    end 
  end

  os.each {|o| o.on_completed } if os
end

#on_error(error) ⇒ Object

Notifies all subscribed observers with the error.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rx/subjects/subject.rb', line 47

def on_error(error)
  raise 'error cannot be nil' unless error

  os = nil
  @gate.synchronize do
    check_disposed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
      @error = error
    end         
  end

  os.each {|o| o.on_error error } if os
end

#on_next(value) ⇒ Object

Notifies all subscribed observers with the value.



66
67
68
69
70
71
72
73
74
# File 'lib/rx/subjects/subject.rb', line 66

def on_next(value) 
  os = nil
  @gate.synchronize do 
    check_disposed
    os = @observers.clone unless @stopped
  end

  os.each {|o| o.on_next value } if os      
end

#subscribe(observer) ⇒ Object

Subscribes an observer to the subject.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/rx/subjects/subject.rb', line 77

def subscribe(observer)
  raise 'observer cannot be nil' unless observer

  @gate.synchronize do
    check_disposed

    if !@stopped
      @observers.push(observer)
      return InnerSubscription.new(self, observer)
    elsif @error
      observer.on_error @error
      return Subscription.empty
    else
      observer.on_completed
      return Subscription.empty
    end
  end
end

#unsubscribeObject

Unsubscribe all observers and release resources.



97
98
99
100
101
102
# File 'lib/rx/subjects/subject.rb', line 97

def unsubscribe
  @gate.synchronize do
    @disposed = true
    @observers = nil
  end
end