Class: OFlow::Actors::Timer
- Inherits:
-
OFlow::Actor
- Object
- OFlow::Actor
- OFlow::Actors::Timer
- Defined in:
- lib/oflow/actors/timer.rb
Constant Summary collapse
- MAX_SLEEP =
1.0
Instance Attribute Summary collapse
-
#count ⇒ Object
readonly
The number of time the timer has fired or shipped.
-
#label ⇒ Object
readonly
Label for the Tracker is used and for trigger content.
-
#pending ⇒ Object
readonly
Time of next or pending trigger.
-
#period ⇒ Object
readonly
How long to wait between each trigger.
-
#repeat ⇒ Object
readonly
How many time to repeat before stopping.
-
#start ⇒ Object
readonly
When to trigger the first event.
-
#stop ⇒ Object
readonly
The stop time.
-
#with_tracker ⇒ Object
readonly
Boolean flag indicating a tracker should be added to the trigger content if true.
Attributes inherited from OFlow::Actor
Instance Method Summary collapse
-
#initialize(task, options = {}) ⇒ Timer
constructor
A new instance of Timer.
-
#perform(op, box) ⇒ Object
The loop in the Task containing this Actor is the thread used for the timer.
- #set_label(v) ⇒ Object
- #set_options(options) ⇒ Object
- #set_period(v) ⇒ Object
- #set_repeat(v) ⇒ Object
- #set_start(v) ⇒ Object
- #set_stop(v) ⇒ Object
- #set_with_tracker(v) ⇒ Object
Methods inherited from OFlow::Actor
#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread
Constructor Details
#initialize(task, options = {}) ⇒ Timer
Returns a new instance of Timer.
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/oflow/actors/timer.rb', line 32 def initialize(task, ={}) @count = 0 @pending = nil @stop = nil @period = nil @repeat = nil () @pending = @start super task.receive(:init, nil) end |
Instance Attribute Details
#count ⇒ Object (readonly)
The number of time the timer has fired or shipped.
25 26 27 |
# File 'lib/oflow/actors/timer.rb', line 25 def count @count end |
#label ⇒ Object (readonly)
Label for the Tracker is used and for trigger content.
23 24 25 |
# File 'lib/oflow/actors/timer.rb', line 23 def label @label end |
#pending ⇒ Object (readonly)
Time of next or pending trigger.
30 31 32 |
# File 'lib/oflow/actors/timer.rb', line 30 def pending @pending end |
#period ⇒ Object (readonly)
How long to wait between each trigger. nil indicates as fast as possible,
19 20 21 |
# File 'lib/oflow/actors/timer.rb', line 19 def period @period end |
#repeat ⇒ Object (readonly)
How many time to repeat before stopping. nil mean go forever.
21 22 23 |
# File 'lib/oflow/actors/timer.rb', line 21 def repeat @repeat end |
#start ⇒ Object (readonly)
When to trigger the first event. nil means start now.
14 15 16 |
# File 'lib/oflow/actors/timer.rb', line 14 def start @start end |
#stop ⇒ Object (readonly)
The stop time. If nil then there is not stopping unless the repeat limit kicks in.
17 18 19 |
# File 'lib/oflow/actors/timer.rb', line 17 def stop @stop end |
#with_tracker ⇒ Object (readonly)
Boolean flag indicating a tracker should be added to the trigger content if true.
28 29 30 |
# File 'lib/oflow/actors/timer.rb', line 28 def with_tracker @with_tracker end |
Instance Method Details
#perform(op, box) ⇒ Object
The loop in the Task containing this Actor is the thread used for the timer. Mostly the perform() method sleeps but it will be woken when a new request is placed on the Task queue so it exits if there is a request on the queue even if it has not triggered a ship() know that it will be re-entered.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/oflow/actors/timer.rb', line 49 def perform(op, box) op = op.to_sym unless op.nil? case op when :stop set_stop(box.nil? ? nil : box.contents) when :start old = @start set_start(box.nil? ? nil : box.contents) @pending = @start if @start < old when :period old = @period set_period(box.nil? ? nil : box.contents) if old.nil? || @pending.nil? || @pending.nil? @pending = nil else @pending = @pending - old + @period end when :repeat set_repeat(box.nil? ? nil : box.contents) when :label set_label(box.nil? ? nil : box.contents) when :with_tracker set_with_tracker(box.nil? ? nil : box.contents) end while true now = Time.now() # If past stop time then it is done. A future change in options can # restart the timer. return if !@stop.nil? && @stop < now # Has repeat number been exceeded? return if !@repeat.nil? && @repeat <= @count # If there is nothing pending the timer has completed. return if @pending.nil? # If the Task is blocked or shutting down. return if Task::CLOSING == task.state || Task::BLOCKED == task.state if @pending <= now # Skip if stopped but do not increment counter. unless Task::STOPPED == task.state @count += 1 now = Time.now() tracker = @with_tracker ? Tracker.create(@label) : nil box = Box.new([@label, @count, now.utc()], tracker) task.links.each_key do |key| begin task.ship(key, box) rescue BlockedError => e task.warn("Failed to ship timer #{box.contents} to #{key}. Task blocked.") rescue BusyError => e task.warn("Failed to ship timer #{box.contents} to #{key}. Task busy.") end end end if @period.nil? || @period == 0 @pending = now else @pending += period end end # If there is a request waiting then return so it can be handled. It # will come back here to allow more timer processing. return if 0 < task.queue_count() if Task::STOPPED == task.state sleep(0.1) else now = Time.now() if now < @pending wait_time = @pending - now wait_time = MAX_SLEEP if MAX_SLEEP < wait_time sleep(wait_time) end end end end |
#set_label(v) ⇒ Object
186 187 188 189 |
# File 'lib/oflow/actors/timer.rb', line 186 def set_label(v) v = v.to_s unless v.nil? @label = v end |
#set_options(options) ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/oflow/actors/timer.rb', line 125 def () set_start([:start]) # if nil let start get set to now set_stop([:stop]) if .has_key?(:stop) set_period([:period]) if .has_key?(:period) set_repeat([:repeat]) if .has_key?(:repeat) set_with_tracker([:with_tracker]) @label = [:label].to_s end |
#set_period(v) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/oflow/actors/timer.rb', line 160 def set_period(v) p = 0.0 if v.kind_of?(Numeric) p = v elsif v.is_a?(String) p = v.strip().to_f else raise ConfigError.new("Expected period to be a Numeric, not a #{v.class}.") end raise ConfigError.new("period must be greater than 0.0.") if 0.0 >= p @period = p end |
#set_repeat(v) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/oflow/actors/timer.rb', line 173 def set_repeat(v) r = nil if v.kind_of?(Fixnum) r = v elsif v.is_a?(String) r = v.strip().to_i elsif !v.nil? raise ConfigError.new("Expected repeat to be a Fixnum, not a #{v.class}.") end raise ConfigError.new("repeat must be greater than or equal 0.0 or nil") if !r.nil? && 0.0 >= r @repeat = r end |
#set_start(v) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/oflow/actors/timer.rb', line 134 def set_start(v) if v.is_a?(String) v = DateTime.parse(v).to_time v = v - v.gmtoff elsif v.is_a?(Numeric) v = Time.now() + v elsif v.nil? v = Time.now() elsif !v.kind_of?(Time) && !v.kind_of?(Date) raise ConfigError.new("Expected start to be a Time or Numeric, not a #{v.class}.") end @start = v end |
#set_stop(v) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/oflow/actors/timer.rb', line 148 def set_stop(v) if v.is_a?(String) v = DateTime.parse(v).to_time v = v - v.gmtoff elsif v.is_a?(Numeric) v = Time.now() + v elsif !v.nil? && !v.kind_of?(Time) && !v.kind_of?(Date) raise ConfigError.new("Expected stop to be a Time or Numeric, not a #{v.class}.") end @stop = v end |
#set_with_tracker(v) ⇒ Object
191 192 193 194 195 196 197 |
# File 'lib/oflow/actors/timer.rb', line 191 def set_with_tracker(v) v = false if v.nil? unless true == v || false == v raise ConfigError.new("Expected with_tracker to be a boolean, not a #{v.class}.") end @with_tracker = v end |