Class: OFlow::Actors::Timer

Inherits:
OFlow::Actor show all
Defined in:
lib/oflow/actors/timer.rb

Constant Summary collapse

MAX_SLEEP =
1.0

Instance Attribute Summary collapse

Attributes inherited from OFlow::Actor

#task

Instance Method Summary collapse

Methods inherited from OFlow::Actor

#inputs, #options, #outputs, #with_own_thread

Constructor Details

#initialize(task, options = {}) ⇒ Timer

Returns a new instance of Timer.



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/oflow/actors/timer.rb', line 30

def initialize(task, options={})
  @count = 0
  @pending = nil
  @stop = nil
  @period = nil
  @repeat = nil
  set_options(options)
  @pending = @start
  super
  task.receive(:init, nil)
end

Instance Attribute Details

#countObject (readonly)

The number of time the timer has fired or shipped.



23
24
25
# File 'lib/oflow/actors/timer.rb', line 23

def count
  @count
end

#labelObject (readonly)

Label for the Tracker is used and for trigger content.



21
22
23
# File 'lib/oflow/actors/timer.rb', line 21

def label
  @label
end

#pendingObject (readonly)

Time of next or pending trigger.



28
29
30
# File 'lib/oflow/actors/timer.rb', line 28

def pending
  @pending
end

#periodObject (readonly)

How long to wait between each trigger. nil indicates as fast as possible,



17
18
19
# File 'lib/oflow/actors/timer.rb', line 17

def period
  @period
end

#repeatObject (readonly)

How many time to repeat before stopping. nil mean go forever.



19
20
21
# File 'lib/oflow/actors/timer.rb', line 19

def repeat
  @repeat
end

#startObject (readonly)

When to trigger the first event. nil means start now.



12
13
14
# File 'lib/oflow/actors/timer.rb', line 12

def start
  @start
end

#stopObject (readonly)

The stop time. If nil then there is not stopping unless the repeat limit kicks in.



15
16
17
# File 'lib/oflow/actors/timer.rb', line 15

def stop
  @stop
end

#with_trackerObject (readonly)

Boolean flag indicating a tracker should be added to the trigger content if true.



26
27
28
# File 'lib/oflow/actors/timer.rb', line 26

def with_tracker
  @with_tracker
end

Instance Method Details

#perform(op, box) ⇒ Object

The loop in the Task containing this Actor is the thread used for the timer. Mostly the perform() method sleeps but it will be woken when a new request is placed on the Task queue so it exits if there is a request on the queue even if it has not triggered a ship() know that it will be re-entered.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/oflow/actors/timer.rb', line 47

def perform(op, box)
  op = op.to_sym unless op.nil?
  case op
  when :stop
    set_stop(box.nil? ? nil : box.contents)
  when :start
    old = @start
    set_start(box.nil? ? nil : box.contents)
    @pending = @start if @start < old
  when :period
    old = @period
    set_period(box.nil? ? nil : box.contents)
    if old.nil? || @pending.nil? || @pending.nil?
      @pending = nil
    else
      @pending = @pending - old + @period
    end
  when :repeat
    set_repeat(box.nil? ? nil : box.contents)
  when :label
    set_label(box.nil? ? nil : box.contents)
  when :with_tracker
    set_with_tracker(box.nil? ? nil : box.contents)
  end
  while true
    now = Time.now()

    # If past stop time then it is done. A future change in options can
    # restart the timer.
    return if !@stop.nil? && @stop < now
    # Has repeat number been exceeded?
    return if !@repeat.nil? && @repeat <= @count
    # If there is nothing pending the timer has completed.
    return if @pending.nil?
    # If the Task is blocked or shutting down.
    return if Task::CLOSING == task.state || Task::BLOCKED == task.state

    if @pending <= now
      # Skip if stopped but do not increment counter.
      unless Task::STOPPED == task.state
        @count += 1
        now = Time.now()
        tracker = @with_tracker ? Tracker.create(@label) : nil
        box = Box.new([@label, @count, now.utc()], tracker)
        task.links.each_key do |key|
          begin
            task.ship(key, box)
          rescue BlockedError => e
            task.warn("Failed to ship timer #{box.contents} to #{key}. Task blocked.")
          rescue BusyError => e
            task.warn("Failed to ship timer #{box.contents} to #{key}. Task busy.")
          end
        end
      end
      if @period.nil? || @period == 0
        @pending = now
      else
        @pending += period
      end
    end
    # If there is a request waiting then return so it can be handled. It
    # will come back here to allow more timer processing.
    return if 0 < task.queue_count()

    if Task::STOPPED == task.state
      sleep(0.1)
    else
      now = Time.now()
      if now < @pending
        wait_time = @pending - now
        wait_time = MAX_SLEEP if MAX_SLEEP < wait_time
        sleep(wait_time)
      end
    end
  end
end

#set_label(v) ⇒ Object



169
170
171
172
# File 'lib/oflow/actors/timer.rb', line 169

def set_label(v)
  v = v.to_s unless v.nil?
  @label = v
end

#set_options(options) ⇒ Object



124
125
126
127
128
129
130
131
# File 'lib/oflow/actors/timer.rb', line 124

def set_options(options)
  set_start(options[:start]) # if nil let start get set to now
  set_stop( options[:stop]) if options.has_key?(:stop)
  set_period(options[:period]) if options.has_key?(:period)
  set_repeat(options[:repeat]) if options.has_key?(:repeat)
  set_with_tracker(options[:with_tracker])
  @label = options[:label].to_s
end

#set_period(v) ⇒ Object



155
156
157
158
159
160
# File 'lib/oflow/actors/timer.rb', line 155

def set_period(v)
  unless v.nil? || v.kind_of?(Numeric)
    raise ConfigError.new("Expected period to be a Numeric, not a #{v.class}.")
  end
  @period = v
end

#set_repeat(v) ⇒ Object



162
163
164
165
166
167
# File 'lib/oflow/actors/timer.rb', line 162

def set_repeat(v)
  unless v.nil? || v.kind_of?(Fixnum)
    raise ConfigError.new("Expected repeat to be a Fixnum, not a #{v.class}.")
  end
  @repeat = v
end

#set_start(v) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/oflow/actors/timer.rb', line 133

def set_start(v)
  now = Time.now()
  if v.is_a?(Numeric)
    v = now + v
  elsif v.nil?
    v = Time.now()
  elsif !v.kind_of?(Time) && !v.kind_of?(Date)
    raise ConfigError.new("Expected start to be a Time or Numeric, not a #{v.class}.")
  end
  @start = v
end

#set_stop(v) ⇒ Object



145
146
147
148
149
150
151
152
153
# File 'lib/oflow/actors/timer.rb', line 145

def set_stop(v)
  now = Time.now()
  if v.is_a?(Numeric)
    v = now + v
  elsif !v.nil? && !v.kind_of?(Time) && !v.kind_of?(Date)
    raise ConfigError.new("Expected stop to be a Time or Numeric, not a #{v.class}.")
  end
  @stop = v
end

#set_with_tracker(v) ⇒ Object



174
175
176
177
178
179
180
# File 'lib/oflow/actors/timer.rb', line 174

def set_with_tracker(v)
  v = false if v.nil?
  unless true == v || false == v
    raise ConfigError.new("Expected with_tracker to be a boolean, not a #{v.class}.")
  end
  @with_tracker = v
end