Class: Rx::ImmediateScheduler::AsyncLockScheduler

Inherits:
LocalScheduler show all
Defined in:
lib/rx/concurrency/immediate_scheduler.rb

Instance Method Summary collapse

Methods inherited from LocalScheduler

#now, #schedule_absolute_with_state

Methods included from Scheduler

normalize, now, #schedule, #schedule_absolute, #schedule_recursive, #schedule_recursive_absolute, #schedule_recursive_absolute_with_state, #schedule_recursive_relative, #schedule_recursive_relative_with_state, #schedule_recursive_with_state, #schedule_relative

Constructor Details

#initializeAsyncLockScheduler

Returns a new instance of AsyncLockScheduler.



33
34
35
# File 'lib/rx/concurrency/immediate_scheduler.rb', line 33

def initialize
    @gate = nil
end

Instance Method Details

#schedule_relative_with_state(state, due_time, action) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rx/concurrency/immediate_scheduler.rb', line 49

def schedule_relative_with_state(state, due_time, action) 
  return self.schedule_with_state state, action if due_time <= 0

  m = SingleAssignmentSubscription.new

  timer = Time.new

  @gate = AsyncLock.new if @gate.nil?

  @gate.wait do
    sleep_time = Time.new - timer
    sleep sleep_time if sleep_time > 0
    m.subscription = action.call self, state unless m.unsubscribed?
  end

  m
end

#schedule_with_state(state, action) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rx/concurrency/immediate_scheduler.rb', line 37

def schedule_with_state(state, action)
  m = SingleAssignmentSubscription.new

  @gate = AsyncLock.new if @gate.nil?

  @gate.wait do
    m.subscription = action.call self, state unless m.unsubscribed?
  end

  m
end