Class: Rx::CurrentThreadScheduler

Inherits:
LocalScheduler show all
Includes:
Singleton
Defined in:
lib/rx/concurrency/current_thread_scheduler.rb

Overview

Represents an object that schedules units of work on the platform’s default scheduler.

Constant Summary collapse

@@thread_local_queue =
nil

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from LocalScheduler

#now, #schedule_absolute_with_state, #schedule_with_state

Methods included from Scheduler

normalize, now, #schedule, #schedule_absolute, #schedule_recursive, #schedule_recursive_absolute, #schedule_recursive_absolute_with_state, #schedule_recursive_relative, #schedule_recursive_relative_with_state, #schedule_recursive_with_state, #schedule_relative

Class Method Details

.queueObject



54
55
56
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 54

def queue
  @@thread_local_queue
end

.queue=(new_queue) ⇒ Object



58
59
60
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 58

def queue=(new_queue)
  @@thread_local_queue = new_queue
end

.run_trampoline(queue) ⇒ Object



62
63
64
65
66
67
68
69
70
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 62

def run_trampoline(queue)
  while item = queue.shift
    unless item.cancelled?
      wait = item.due_time - Scheduler.now.to_i
      sleep wait if wait > 0
      item.invoke unless item.cancelled?
    end
  end
end

.schedule_required?Boolean

Gets a value that indicates whether the caller must call a Schedule method.

Returns:

  • (Boolean)


20
21
22
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 20

def self.schedule_required?
  @@thread_local_queue.nil?
end

Instance Method Details

#schedule_relative_with_state(state, due_time, action) ⇒ Object

Schedules an action to be executed after dueTime.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 25

def schedule_relative_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action

  dt = self.now.to_i + Scheduler.normalize(due_time)
  si = ScheduledItem.new self, state, dt, &action

  local_queue = self.class.queue

  unless local_queue
    local_queue = PriorityQueue.new
    local_queue.push si

    self.class.queue = local_queue

    begin
      self.class.run_trampoline local_queue
    ensure
      self.class.queue = nil
    end
  else
    local_queue.push si
  end

  Subscription.create { si.cancel }
end