Class: Rx::VirtualTimeScheduler

Inherits:
Object
  • Object
show all
Includes:
Scheduler
Defined in:
lib/rx/concurrency/virtual_time_scheduler.rb

Overview

Base class for virtual time schedulers using a priority queue for scheduled items.

Direct Known Subclasses

HistoricalScheduler, TestScheduler

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Scheduler

normalize, now, #schedule, #schedule_absolute, #schedule_recursive, #schedule_recursive_absolute, #schedule_recursive_absolute_with_state, #schedule_recursive_relative, #schedule_recursive_relative_with_state, #schedule_recursive_with_state, #schedule_relative

Constructor Details

#initialize(initial_clock) ⇒ VirtualTimeScheduler

Returns a new instance of VirtualTimeScheduler.



16
17
18
19
20
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 16

def initialize(initial_clock)
  @clock = initial_clock.to_i
  @queue = PriorityQueue.new
  @enabled = false
end

Instance Attribute Details

#clockObject (readonly)

Returns the value of attribute clock.



14
15
16
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 14

def clock
  @clock
end

Instance Method Details

#advance_by(time) ⇒ Object

Advances the scheduler’s clock by the specified relative time, running all work scheduled for that timespan.



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 130

def advance_by(time)
  dt = @clock + time

  due_to_clock = dt<=>clock
  raise 'Time is out of range' if due_to_clock < 0

  return if due_to_clock == 0
  raise 'Cannot advance while running' if @enabled

  self.advance_to dt
end

#advance_to(time) ⇒ Object

Advances the scheduler’s clock to the specified time, running all work till that point.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 102

def advance_to(time)
  due_to_clock = time<=>clock
  raise 'Time is out of range' if due_to_clock < 0 

  return if due_to_clock == 0

  unless @enabled
    @enabled = true

    begin
      next_item = self.get_next
      if !next_item.nil? && next_item.due_time <= time
        @clock = next_item.due_time if next_item.due_time > @clock
        next_item.invoke
      else
        @enabled = false
      end

    end while @enabled

    @clock = time
  else
    raise 'Cannot advance while running'
  end

end

#enabled?Boolean

Gets whether the scheduler is enabled to run work.

Returns:

  • (Boolean)


28
29
30
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 28

def enabled?
  @enabled
end

#get_nextObject

Gets the next scheduled item to be executed



153
154
155
156
157
158
159
160
161
162
163
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 153

def get_next
  while next_item = @queue.peek
    if next_item.cancelled?
      @queue.shift
    else
      return next_item
    end
  end

  return nil
end

#invoke(scheduler, action) ⇒ Object



165
166
167
168
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 165

def invoke(scheduler, action)
  action.call
  Subscription.empty
end

#nowObject

Gets the scheduler’s notion of current time.



23
24
25
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 23

def now
  clock
end

#schedule_at_absolute(due_time, action) ⇒ Object

Schedules an action to be executed at due_time.



78
79
80
81
82
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 78

def schedule_at_absolute(due_time, action)
  raise 'action cannot be nil' unless action

  schedule_at_absolute_with_state(action, due_time, method(:invoke))      
end

#schedule_at_absolute_with_state(state, due_time, action) ⇒ Object Also known as: schedule_absolute_with_state

Schedules an action to be executed at due_time.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 85

def schedule_at_absolute_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action

  si = nil
  run = lambda {|scheduler, state1|
    @queue.delete si
    action.call(scheduler, state1)
  }

  si = ScheduledItem.new(self, state, due_time, &run)
  @queue.push si

  Subscription.create { si.cancel }
end

#schedule_at_relative(due_time, action) ⇒ Object

Schedules an action to be executed at due_time.



63
64
65
66
67
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 63

def schedule_at_relative(due_time, action)
  raise 'action cannot be nil' unless action

  schedule_at_relative_with_state(action, due_time, method(:invoke))
end

#schedule_at_relative_with_state(state, due_time, action) ⇒ Object Also known as: schedule_relative_with_state

Schedules an action to be executed at due_time.



70
71
72
73
74
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 70

def schedule_at_relative_with_state(state, due_time, action)
  raise 'action cannot be nil' unless action

  schedule_at_absolute_with_state(state, @clock + due_time, action)
end

#schedule_with_state(state, action) ⇒ Object

Schedules an action to be executed.



57
58
59
60
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 57

def schedule_with_state(state, action)
  raise 'action cannot be nil' unless action
  schedule_at_absolute_with_state(state, clock, action)
end

#sleep(time) ⇒ Object

Advances the scheduler’s clock by the specified relative time.



143
144
145
146
147
148
149
150
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 143

def sleep(time)
  dt = @clock + time

  due_to_clock = dt<=>@clock
  raise 'Time is out of range' if due_to_clock < 0

  @clock = dt
end

#startObject

Starts the virtual time scheduler.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 33

def start
  unless @enabled
    @enabled = true

    begin
      next_item = self.get_next

      unless next_item.nil?
        @clock = next_item.due_time if next_item.due_time > @clock
        next_item.invoke
      else
        @enabled = false
      end

    end while @enabled
  end
end

#stopObject

Stops the virtual time scheduler.



52
53
54
# File 'lib/rx/concurrency/virtual_time_scheduler.rb', line 52

def stop
  @enabled = false
end