Class: Rx::VirtualTimeScheduler
- Inherits:
-
Object
- Object
- Rx::VirtualTimeScheduler
- Includes:
- Scheduler
- Defined in:
- lib/rx/concurrency/virtual_time_scheduler.rb
Overview
Base class for virtual time schedulers using a priority queue for scheduled items.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#clock ⇒ Object
readonly
Returns the value of attribute clock.
Instance Method Summary collapse
-
#advance_by(time) ⇒ Object
Advances the scheduler’s clock by the specified relative time, running all work scheduled for that timespan.
-
#advance_to(time) ⇒ Object
Advances the scheduler’s clock to the specified time, running all work till that point.
-
#enabled? ⇒ Boolean
Gets whether the scheduler is enabled to run work.
-
#get_next ⇒ Object
Gets the next scheduled item to be executed.
-
#initialize(initial_clock) ⇒ VirtualTimeScheduler
constructor
A new instance of VirtualTimeScheduler.
- #invoke(scheduler, action) ⇒ Object
-
#now ⇒ Object
Gets the scheduler’s notion of current time.
-
#schedule_at_absolute(due_time, action) ⇒ Object
Schedules an action to be executed at due_time.
-
#schedule_at_absolute_with_state(state, due_time, action) ⇒ Object
(also: #schedule_absolute_with_state)
Schedules an action to be executed at due_time.
-
#schedule_at_relative(due_time, action) ⇒ Object
Schedules an action to be executed at due_time.
-
#schedule_at_relative_with_state(state, due_time, action) ⇒ Object
(also: #schedule_relative_with_state)
Schedules an action to be executed at due_time.
-
#schedule_with_state(state, action) ⇒ Object
Schedules an action to be executed.
-
#sleep(time) ⇒ Object
Advances the scheduler’s clock by the specified relative time.
-
#start ⇒ Object
Starts the virtual time scheduler.
-
#stop ⇒ Object
Stops the virtual time scheduler.
Methods included from Scheduler
normalize, now, #schedule, #schedule_absolute, #schedule_recursive, #schedule_recursive_absolute, #schedule_recursive_absolute_with_state, #schedule_recursive_relative, #schedule_recursive_relative_with_state, #schedule_recursive_with_state, #schedule_relative
Constructor Details
#initialize(initial_clock) ⇒ VirtualTimeScheduler
Returns a new instance of VirtualTimeScheduler.
16 17 18 19 20 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 16 def initialize(initial_clock) @clock = initial_clock.to_i @queue = PriorityQueue.new @enabled = false end |
Instance Attribute Details
#clock ⇒ Object (readonly)
Returns the value of attribute clock.
14 15 16 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 14 def clock @clock end |
Instance Method Details
#advance_by(time) ⇒ Object
Advances the scheduler’s clock by the specified relative time, running all work scheduled for that timespan.
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 130 def advance_by(time) dt = @clock + time due_to_clock = dt<=>clock raise 'Time is out of range' if due_to_clock < 0 return if due_to_clock == 0 raise 'Cannot advance while running' if @enabled self.advance_to dt end |
#advance_to(time) ⇒ Object
Advances the scheduler’s clock to the specified time, running all work till that point.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 102 def advance_to(time) due_to_clock = time<=>clock raise 'Time is out of range' if due_to_clock < 0 return if due_to_clock == 0 unless @enabled @enabled = true begin next_item = self.get_next if !next_item.nil? && next_item.due_time <= time @clock = next_item.due_time if next_item.due_time > @clock next_item.invoke else @enabled = false end end while @enabled @clock = time else raise 'Cannot advance while running' end end |
#enabled? ⇒ Boolean
Gets whether the scheduler is enabled to run work.
28 29 30 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 28 def enabled? @enabled end |
#get_next ⇒ Object
Gets the next scheduled item to be executed
153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 153 def get_next while next_item = @queue.peek if next_item.cancelled? @queue.shift else return next_item end end return nil end |
#invoke(scheduler, action) ⇒ Object
165 166 167 168 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 165 def invoke(scheduler, action) action.call Subscription.empty end |
#now ⇒ Object
Gets the scheduler’s notion of current time.
23 24 25 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 23 def now clock end |
#schedule_at_absolute(due_time, action) ⇒ Object
Schedules an action to be executed at due_time.
78 79 80 81 82 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 78 def schedule_at_absolute(due_time, action) raise 'action cannot be nil' unless action schedule_at_absolute_with_state(action, due_time, method(:invoke)) end |
#schedule_at_absolute_with_state(state, due_time, action) ⇒ Object Also known as: schedule_absolute_with_state
Schedules an action to be executed at due_time.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 85 def schedule_at_absolute_with_state(state, due_time, action) raise 'action cannot be nil' unless action si = nil run = lambda {|scheduler, state1| @queue.delete si action.call(scheduler, state1) } si = ScheduledItem.new(self, state, due_time, &run) @queue.push si Subscription.create { si.cancel } end |
#schedule_at_relative(due_time, action) ⇒ Object
Schedules an action to be executed at due_time.
63 64 65 66 67 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 63 def schedule_at_relative(due_time, action) raise 'action cannot be nil' unless action schedule_at_relative_with_state(action, due_time, method(:invoke)) end |
#schedule_at_relative_with_state(state, due_time, action) ⇒ Object Also known as: schedule_relative_with_state
Schedules an action to be executed at due_time.
70 71 72 73 74 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 70 def schedule_at_relative_with_state(state, due_time, action) raise 'action cannot be nil' unless action schedule_at_absolute_with_state(state, @clock + due_time, action) end |
#schedule_with_state(state, action) ⇒ Object
Schedules an action to be executed.
57 58 59 60 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 57 def schedule_with_state(state, action) raise 'action cannot be nil' unless action schedule_at_absolute_with_state(state, clock, action) end |
#sleep(time) ⇒ Object
Advances the scheduler’s clock by the specified relative time.
143 144 145 146 147 148 149 150 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 143 def sleep(time) dt = @clock + time due_to_clock = dt<=>@clock raise 'Time is out of range' if due_to_clock < 0 @clock = dt end |
#start ⇒ Object
Starts the virtual time scheduler.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 33 def start unless @enabled @enabled = true begin next_item = self.get_next unless next_item.nil? @clock = next_item.due_time if next_item.due_time > @clock next_item.invoke else @enabled = false end end while @enabled end end |
#stop ⇒ Object
Stops the virtual time scheduler.
52 53 54 |
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 52 def stop @enabled = false end |