Class: OFlow::Actors::Timer

Inherits:
OFlow::Actor show all
Defined in:
lib/oflow/actors/timer.rb

Constant Summary collapse

MAX_SLEEP =
1.0

Instance Attribute Summary collapse

Attributes inherited from OFlow::Actor

#task

Instance Method Summary collapse

Methods inherited from OFlow::Actor

#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread

Constructor Details

#initialize(task, options = {}) ⇒ Timer

Returns a new instance of Timer.



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/oflow/actors/timer.rb', line 35

def initialize(task, options={})
  @count = 0
  @pending = nil
  @stop = nil
  @period = nil
  @repeat = nil
	@on_start = false
  set_options(options)
  @pending = @start
  super
  task.receive(:init, nil)
end

Instance Attribute Details

#countObject (readonly)

The number of time the timer has fired or shipped.



26
27
28
# File 'lib/oflow/actors/timer.rb', line 26

def count
  @count
end

#labelObject (readonly)

Label for the Tracker is used and for trigger content.



24
25
26
# File 'lib/oflow/actors/timer.rb', line 24

def label
  @label
end

#on_startObject (readonly)

Flag indicating a trigger should be fired on start as well.



33
34
35
# File 'lib/oflow/actors/timer.rb', line 33

def on_start
  @on_start
end

#pendingObject (readonly)

Time of next or pending trigger.



31
32
33
# File 'lib/oflow/actors/timer.rb', line 31

def pending
  @pending
end

#periodObject (readonly)

How long to wait between each trigger. nil indicates as fast as possible,



20
21
22
# File 'lib/oflow/actors/timer.rb', line 20

def period
  @period
end

#repeatObject (readonly)

How many time to repeat before stopping. nil mean go forever.



22
23
24
# File 'lib/oflow/actors/timer.rb', line 22

def repeat
  @repeat
end

#startObject (readonly)

When to trigger the first event. nil means start now.



14
15
16
# File 'lib/oflow/actors/timer.rb', line 14

def start
  @start
end

#stopObject (readonly)

The stop time. If nil then there is no stopping unless the repeat limit kicks in.



17
18
19
# File 'lib/oflow/actors/timer.rb', line 17

def stop
  @stop
end

#with_trackerObject (readonly)

Boolean flag indicating a tracker should be added to the trigger content if true.



29
30
31
# File 'lib/oflow/actors/timer.rb', line 29

def with_tracker
  @with_tracker
end

Instance Method Details

#perform(op, box) ⇒ Object

The loop in the Task containing this Actor is the thread used for the timer. Mostly the perform() method sleeps but it will be woken when a new request is placed on the Task queue so it exits if there is a request on the queue even if it has not triggered a ship() know that it will be re-entered.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/oflow/actors/timer.rb', line 53

def perform(op, box)
  op = op.to_sym unless op.nil?
  case op
  when :stop
    set_stop(box.nil? ? nil : box.contents)
  when :start
    old = @start
    set_start(box.nil? ? nil : box.contents)
    @pending = @start if @start < old
  when :period
    old = @period
    set_period(box.nil? ? nil : box.contents)
    if old.nil? || @pending.nil? || @pending.nil?
      @pending = nil
    else
      @pending = @pending - old + @period
    end
  when :repeat
    set_repeat(box.nil? ? nil : box.contents)
  when :label
    set_label(box.nil? ? nil : box.contents)
  when :with_tracker
    set_with_tracker(box.nil? ? nil : box.contents)
  end
	if @on_start
	  trigger(Time.now())
	end
  while true
    now = Time.now()
    # If past stop time then it is done. A future change in options can
    # restart the timer.
    return if !@stop.nil? && @stop < now
    # Has repeat number been exceeded?
    return if !@repeat.nil? && @repeat <= @count
    # If there is nothing pending the timer has completed.
    return if @pending.nil?
    # If the Task is blocked or shutting down.
    return if Task::CLOSING == task.state || Task::BLOCKED == task.state

    if @pending <= now
      # Skip if stopped but do not increment counter.
      unless Task::STOPPED == task.state
        @count += 1
        now = Time.now()
 trigger(now)
      end
      if @period.nil? || @period == 0
        @pending = now
      else
 diff = now - @pending
        @pending += @period * diff.to_i/@period.to_i
 @pending += @period if @pending <= now
      end
    end
    # If there is a request waiting then return so it can be handled. It
    # will come back here to allow more timer processing.
    return if 0 < task.queue_count()

    if Task::STOPPED == task.state
      sleep(0.1)
    else
      now = Time.now()
      if now < @pending
        wait_time = @pending - now
        wait_time = MAX_SLEEP if MAX_SLEEP < wait_time
        sleep(wait_time)
      end
    end
  end
end

#set_label(v) ⇒ Object



205
206
207
208
# File 'lib/oflow/actors/timer.rb', line 205

def set_label(v)
  v = v.to_s unless v.nil?
  @label = v
end

#set_on_start(v) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
# File 'lib/oflow/actors/timer.rb', line 210

def set_on_start(v)
	if v.is_a?(TrueClass)
    @on_start = true
	elsif v.is_a?(FalseClass)
	  @on_start = false
	elsif v.nil?
	  # no change
	else
    @on_start = ('true' == v.to_s.strip.downcase)
	end
end

#set_options(options) ⇒ Object



138
139
140
141
142
143
144
145
146
# File 'lib/oflow/actors/timer.rb', line 138

def set_options(options)
  set_start(options[:start]) # if nil let start get set to now
  set_stop(options[:stop]) if options.has_key?(:stop)
  set_period(options[:period]) if options.has_key?(:period)
  set_repeat(options[:repeat]) if options.has_key?(:repeat)
  set_with_tracker(options[:with_tracker])
  set_on_start(options[:on_start])
  @label = options[:label].to_s
end

#set_period(v) ⇒ Object

Raises:



179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/oflow/actors/timer.rb', line 179

def set_period(v)
  p = 0.0
  if v.kind_of?(Numeric)
    p = v
  elsif v.is_a?(String)
    p = v.strip().to_f
  else
    raise ConfigError.new("Expected period to be a Numeric, not a #{v.class}.")
  end
  raise ConfigError.new("period must be greater than 0.0.") if 0.0 >= p
  @period = p
end

#set_repeat(v) ⇒ Object

Raises:



192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/oflow/actors/timer.rb', line 192

def set_repeat(v)
  r = nil
  if v.kind_of?(Fixnum)
    r = v
  elsif v.is_a?(String)
    r = v.strip().to_i
  elsif !v.nil?
    raise ConfigError.new("Expected repeat to be a Fixnum, not a #{v.class}.")
  end
  raise ConfigError.new("repeat must be greater than or equal 0.0 or nil") if !r.nil? && 0.0 >= r
  @repeat = r
end

#set_start(v) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/oflow/actors/timer.rb', line 148

def set_start(v)
  if v.is_a?(String)
    begin
      v = DateTime.parse(v).to_time
      v = v - v.localtime.gmtoff
    rescue Exception
      v = Time.now() + v.to_i
    end
  elsif v.is_a?(Numeric)
    v = Time.now() + v
  elsif v.nil?
    v = Time.now()
  elsif !v.kind_of?(Time) && !v.kind_of?(Date)
    raise ConfigError.new("Expected start to be a Time or Numeric, not a #{v.class}.")
  end
  @start = v
  @pending = @start
end

#set_stop(v) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/oflow/actors/timer.rb', line 167

def set_stop(v)
  if v.is_a?(String)
    v = DateTime.parse(v).to_time
    v = v - v.gmtoff
  elsif v.is_a?(Numeric)
    v = Time.now() + v
  elsif !v.nil? && !v.kind_of?(Time) && !v.kind_of?(Date)
    raise ConfigError.new("Expected stop to be a Time or Numeric, not a #{v.class}.")
  end
  @stop = v
end

#set_with_tracker(v) ⇒ Object



222
223
224
225
226
227
228
# File 'lib/oflow/actors/timer.rb', line 222

def set_with_tracker(v)
  v = false if v.nil?
  unless true == v || false == v
    raise ConfigError.new("Expected with_tracker to be a boolean, not a #{v.class}.")
  end
  @with_tracker = v
end

#trigger(now) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/oflow/actors/timer.rb', line 124

def trigger(now)
  tracker = @with_tracker ? Tracker.create(@label) : nil
  box = Box.new([@label, @count, now.utc()], tracker)
  task.links.each_key do |key|
    begin
      task.ship(key, box)
    rescue BlockedError
      task.warn("Failed to ship timer #{box.contents} to #{key}. Task blocked.")
    rescue BusyError
      task.warn("Failed to ship timer #{box.contents} to #{key}. Task busy.")
    end
  end
end