Class: Rx::CurrentThreadScheduler
- Inherits:
-
LocalScheduler
- Object
- LocalScheduler
- Rx::CurrentThreadScheduler
- Includes:
- Singleton
- Defined in:
- lib/rx/concurrency/current_thread_scheduler.rb
Overview
Represents an object that schedules units of work on the platform’s default scheduler.
Constant Summary collapse
- @@thread_local_queue =
nil
Class Method Summary collapse
- .queue ⇒ Object
- .queue=(new_queue) ⇒ Object
- .run_trampoline(queue) ⇒ Object
-
.schedule_required? ⇒ Boolean
Gets a value that indicates whether the caller must call a Schedule method.
Instance Method Summary collapse
-
#schedule_relative_with_state(state, due_time, action) ⇒ Object
Schedules an action to be executed after dueTime.
Methods inherited from LocalScheduler
#now, #schedule_absolute_with_state, #schedule_with_state
Methods included from Scheduler
normalize, now, #schedule, #schedule_absolute, #schedule_recursive, #schedule_recursive_absolute, #schedule_recursive_absolute_with_state, #schedule_recursive_relative, #schedule_recursive_relative_with_state, #schedule_recursive_with_state, #schedule_relative
Class Method Details
.queue ⇒ Object
54 55 56 |
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 54 def queue @@thread_local_queue end |
.queue=(new_queue) ⇒ Object
58 59 60 |
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 58 def queue=(new_queue) @@thread_local_queue = new_queue end |
.run_trampoline(queue) ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 62 def run_trampoline(queue) while item = queue.shift unless item.cancelled? wait = item.due_time - Scheduler.now.to_i sleep wait if wait > 0 item.invoke unless item.cancelled? end end end |
.schedule_required? ⇒ Boolean
Gets a value that indicates whether the caller must call a Schedule method.
20 21 22 |
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 20 def self.schedule_required? @@thread_local_queue.nil? end |
Instance Method Details
#schedule_relative_with_state(state, due_time, action) ⇒ Object
Schedules an action to be executed after dueTime.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rx/concurrency/current_thread_scheduler.rb', line 25 def schedule_relative_with_state(state, due_time, action) raise 'action cannot be nil' unless action dt = self.now.to_i + Scheduler.normalize(due_time) si = ScheduledItem.new self, state, dt, &action local_queue = self.class.queue unless local_queue local_queue = PriorityQueue.new local_queue.push si self.class.queue = local_queue begin self.class.run_trampoline local_queue ensure self.class.queue = nil end else local_queue.push si end Subscription.create { si.cancel } end |