Class: RxRuby::ScheduledObserver

Inherits:
ObserverBase show all
Defined in:
lib/rx_ruby/core/scheduled_observer.rb

Direct Known Subclasses

ObserveOnObserver

Instance Method Summary collapse

Methods inherited from ObserverBase

#dispose, #fail, #on_completed, #on_error, #on_next

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Constructor Details

#initialize(scheduler, observer) ⇒ ScheduledObserver

Returns a new instance of ScheduledObserver.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/rx_ruby/core/scheduled_observer.rb', line 11

def initialize(scheduler, observer)
  @scheduler = scheduler
  @observer = observer
  @gate = Monitor.new
  @queue = []
  @subscriber = SerialSubscription.new
  @acquired = false
  @faulted = false

  config = ObserverConfiguration.new
  config.on_next(&method(:on_next_core))
  config.on_error(&method(:on_error_core))
  config.on_completed(&method(:on_completed_core))

  super(config)      
end

Instance Method Details

#ensure_active(n = 0) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rx_ruby/core/scheduled_observer.rb', line 40

def ensure_active(n=0)
  owner = false

  @gate.synchronize do
    if !@faulted && @queue.length > 0
      owner = !@acquired
      @acquired = true
    end
  end

  @subscriber.subscription = @scheduler.schedule_recursive_with_state(nil, method(:run)) if owner
end

#on_completed_coreObject



36
37
38
# File 'lib/rx_ruby/core/scheduled_observer.rb', line 36

def on_completed_core
   @gate.synchronize { @queue.push(lambda { @observer.on_completed }) }
end

#on_error_core(error) ⇒ Object



32
33
34
# File 'lib/rx_ruby/core/scheduled_observer.rb', line 32

def on_error_core(error)
   @gate.synchronize { @queue.push(lambda { @observer.on_error error }) }
end

#on_next_core(value) ⇒ Object



28
29
30
# File 'lib/rx_ruby/core/scheduled_observer.rb', line 28

def on_next_core(value)
  @gate.synchronize { @queue.push(lambda { @observer.on_next value }) }
end

#run(state, recurse) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rx_ruby/core/scheduled_observer.rb', line 53

def run(state, recurse)
  work = nil
  @gate.synchronize do
    if @queue.length > 0
      work = @queue.shift
    else
      @acquired = false
      return
    end
  end

  begin
    work.call
  rescue => e
    @queue = []
    @faulted = true

    raise e
  end

  recurse.call state
end

#unsubscribeObject



76
77
78
79
# File 'lib/rx_ruby/core/scheduled_observer.rb', line 76

def unsubscribe
  super
  @subscriber.unsubscribe
end