Class: OFlow::Actors::Timer

Inherits:
OFlow::Actor show all
Defined in:
lib/oflow/actors/timer.rb

Constant Summary collapse

MAX_SLEEP =
1.0

Instance Attribute Summary collapse

Attributes inherited from OFlow::Actor

#task

Instance Method Summary collapse

Methods inherited from OFlow::Actor

#inputs, #options, #outputs, #with_own_thread

Constructor Details

#initialize(task, options = {}) ⇒ Timer

Returns a new instance of Timer.



30
31
32
33
34
35
36
37
38
# File 'lib/oflow/actors/timer.rb', line 30

def initialize(task, options={})
  @count = 0
  @pending = nil
  set_options(options)
  @start = Time.now() if @start.nil?
  @pending = @start
  super
  task.receive(:init, nil)
end

Instance Attribute Details

#countObject (readonly)

The number of time the timer has fired or shipped.



23
24
25
# File 'lib/oflow/actors/timer.rb', line 23

def count
  @count
end

#labelObject (readonly)

Label for the Tracker is used and for trigger content.



21
22
23
# File 'lib/oflow/actors/timer.rb', line 21

def label
  @label
end

#pendingObject (readonly)

Time of next or pending trigger.



28
29
30
# File 'lib/oflow/actors/timer.rb', line 28

def pending
  @pending
end

#periodObject (readonly)

How long to wait between each trigger. nil indicates as fast as possible,



17
18
19
# File 'lib/oflow/actors/timer.rb', line 17

def period
  @period
end

#repeatObject (readonly)

How many time to repeat before stopping. nil mean go forever.



19
20
21
# File 'lib/oflow/actors/timer.rb', line 19

def repeat
  @repeat
end

#startObject (readonly)

When to trigger the first event. nil means start now.



12
13
14
# File 'lib/oflow/actors/timer.rb', line 12

def start
  @start
end

#stopObject (readonly)

The stop time. If nil then there is not stopping unless the repeat limit kicks in.



15
16
17
# File 'lib/oflow/actors/timer.rb', line 15

def stop
  @stop
end

#with_trackerObject (readonly)

Boolean flag indicating a tracker should be added to the trigger content if true.



26
27
28
# File 'lib/oflow/actors/timer.rb', line 26

def with_tracker
  @with_tracker
end

Instance Method Details

#perform(op, box) ⇒ Object

The loop in the Task containing this Actor is the thread used for the timer. Mostly the perform() method sleeps but it will be woken when a new request is placed on the Task queue so it exits if there is a request on the queue even if it has not triggered a ship() know that it will be re-entered.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/oflow/actors/timer.rb', line 45

def perform(op, box)
  op = op.to_sym unless op.nil?
  case op
  when :stop
    # TBD if no arg (or earlier than now) then stop now else set to new stop time
  when :start
    # TBD if stopped then start if no arg, if arg then set start time
  when :period
    # TBD
  when :repeat
    # TBD
  when :label
    # TBD
  when :with_tracker
    # TBD
  end
  while true
    now = Time.now()

    # If past stop time then it is done. A future change in options can
    # restart the timer.
    return if !@stop.nil? && @stop < now
    # Has repeat number been exceeded?
    return if !@repeat.nil? && @repeat <= @count
    # If there is nothing pending the timer has completed.
    return if @pending.nil?
    # If the Task is blocked or shutting down.
    return if Task::CLOSING == task.state || Task::BLOCKED == task.state

    if @pending <= now
      # Skip if stopped but do not increment counter.
      unless Task::STOPPED == task.state
        @count += 1
        now = Time.now()
        tracker = @with_tracker ? Tracker.new(@label) : nil
        box = Box.new([@label, @count, now.utc()], tracker)
        task.links.each_key do |key|
          begin
            task.ship(key, box)
          rescue BlockedError => e
            task.warn("Failed to ship timer #{box.contents} to #{key}. Task blocked.")
          rescue BusyError => e
            task.warn("Failed to ship timer #{box.contents} to #{key}. Task busy.")
          end
        end
      end
      if @period.nil? || @period == 0
        @pending = now
      else
        @pending += period
      end
    end
    # If there is a request waiting then return so it can be handled. It
    # will come back here to allow more timer processing.
    return if 0 < task.queue_count()

    if Task::STOPPED == task.state
      sleep(0.1)
    else
      now = Time.now()
      if now < @pending
        wait_time = @pending - now
        wait_time = MAX_SLEEP if MAX_SLEEP < wait_time
        sleep(wait_time)
      end
    end
  end
end

#set_options(options) ⇒ Object



114
115
116
117
118
119
120
121
122
# File 'lib/oflow/actors/timer.rb', line 114

def set_options(options)
  @start = options[:start]
  @stop = options[:stop]
  @period = options[:period]
  @repeat = options[:repeat]
  @label = options[:label]
  @with_tracker = options[:with_tracker]
  # TBD check values for type and range
end