Class: Rx::ConnectableObservable

Inherits:
AnonymousObservable show all
Defined in:
lib/rx/linq/connectable_observable.rb

Instance Method Summary collapse

Methods included from Observable

#_subscribe, #add_ref, #all?, amb, #amb, #and, #any?, #as_observable, #average, #buffer_with_count, #buffer_with_time, case, #combine_latest, combine_latest, #concat, concat, #concat_all, #concat_map, #concat_map_observer, #contains, #contains?, #count, create, #debounce, #default_if_empty, defer, #delay, #delay_with_selector, #dematerialize, #distinct, #distinct_until_changed, #do, #element_at, #element_at_or_default, empty, #empty?, #ensures, #enumerator_repeat_infinitely, #enumerator_repeat_times, #final, #first, #first_or_default, #flat_map, #flat_map_with_index, for, fork_join, from, from_array, generate, #group_join, if, #ignore_elements, interval, just, #last, #last_or_default, #latest, #map, #map_with_index, #materialize, #max, #max_by, #merge, #merge_all, merge_all, merge_concurrent, #merge_concurrent, #min, #min_by, #multicast, never, #none?, #observe_on, of, of_array, of_enumerable, of_enumerator, on_error_resume_next, #on_error_resume_next, pairs, #pluck, #publish, raise_error, range, #reduce, repeat, #repeat, repeat_infinitely, #repeat_infinitely, rescue_error, #rescue_error, #retry, #retry_infinitely, #scan, #select, #select_with_index, #sequence_eql?, #single, #single_or_default, #skip, #skip_last, #skip_until, #skip_while, #skip_while_with_index, start, #start_with, #subscribe, #subscribe_on, #subscribe_on_completed, #subscribe_on_error, #subscribe_on_next, #sum, #synchronize, #take, #take_last, #take_last_buffer, #take_until, #take_while, #take_while_with_index, #tap, #time_interval, timer, #timestamp, #to_a, to_async, #to_h, using, when, while, #window_with_count, #window_with_time, #zip, zip

Constructor Details

#initialize(source, subject) ⇒ ConnectableObservable

Returns a new instance of ConnectableObservable.



3
4
5
6
7
8
9
10
# File 'lib/rx/linq/connectable_observable.rb', line 3

def initialize(source, subject)
  @has_subscription = false
  @subscription = nil
  @source_observable = source.as_observable
  @subject = subject

  super(&subject.method(:subscribe))
end

Instance Method Details

#connectObject



12
13
14
15
16
17
18
# File 'lib/rx/linq/connectable_observable.rb', line 12

def connect
  unless @has_subscription
    @has_subscription = true
    @subscription = CompositeSubscription.new [@source_observable.subscribe(@subject), Subscription.create { @has_subscription = false }]
  end
  @subscription
end

#ref_countObject



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/rx/linq/connectable_observable.rb', line 20

def ref_count
  count = 0
  AnonymousObservable.new do |observer|
    count += 1
    should_connect = true if count == 1
    connectable_subscription = self.connect if should_connect
    Subscription.create {
      @subscription.unsubscribe
      count -= 1
      connectable_subscription.unsubscribe if count == 0
    }
  end
end