Class: Rx::AutoDetachObserver

Inherits:
ObserverBase show all
Defined in:
lib/rx/core/auto_detach_observer.rb

Instance Method Summary collapse

Methods inherited from ObserverBase

#dispose, #fail, #on_completed, #on_error, #on_next

Methods included from Observer

allow_reentrancy, #as_observer, #checked, configure, create, from_notifier, #notify_on, prevent_reentrancy, #to_notifier

Constructor Details

#initialize(observer) ⇒ AutoDetachObserver

Returns a new instance of AutoDetachObserver.



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rx/core/auto_detach_observer.rb', line 36

def initialize(observer)
  @observer = observer
  @m = SingleAssignmentSubscription.new

  config = ObserverConfiguration.new
  config.on_next(&method(:on_next_core))
  config.on_error(&method(:on_error_core))
  config.on_completed(&method(:on_completed_core))

  super(config)
end

Instance Method Details

#on_completed_coreObject



28
29
30
31
32
33
34
# File 'lib/rx/core/auto_detach_observer.rb', line 28

def on_completed_core
  begin
    @observer.on_completed
  ensure
    unsubscribe
  end
end

#on_error_core(error) ⇒ Object



20
21
22
23
24
25
26
# File 'lib/rx/core/auto_detach_observer.rb', line 20

def on_error_core(error)
  begin
    @observer.on_error(error)
  ensure
    unsubscribe
  end
end

#on_next_core(value) ⇒ Object



10
11
12
13
14
15
16
17
18
# File 'lib/rx/core/auto_detach_observer.rb', line 10

def on_next_core(value) 
  no_error = false
  begin
    @observer.on_next(value)
    no_error = true
  ensure
    unsubscribe unless no_error
  end
end

#subscription=(new_subscription) ⇒ Object



48
49
50
# File 'lib/rx/core/auto_detach_observer.rb', line 48

def subscription=(new_subscription)
  @m.subscription = new_subscription
end

#unsubscribeObject



52
53
54
55
# File 'lib/rx/core/auto_detach_observer.rb', line 52

def unsubscribe
  super
  @m.unsubscribe
end