Class: MessageBus::TimerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/message_bus/timer_thread.rb

Defined Under Namespace

Classes: Cancelable, CancelableEvery

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTimerThread

Returns a new instance of TimerThread.



33
34
35
36
37
38
39
40
# File 'lib/message_bus/timer_thread.rb', line 33

def initialize
  @stopped = false
  @jobs = []
  @mutex = Mutex.new
  @next = nil
  @thread = Thread.new { do_work }
  @on_error = lambda { |e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}" }
end

Instance Attribute Details

#jobsObject (readonly)

Returns the value of attribute jobs.



4
5
6
# File 'lib/message_bus/timer_thread.rb', line 4

def jobs
  @jobs
end

Instance Method Details

#every(delay, &block) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/message_bus/timer_thread.rb', line 61

def every(delay, &block)
  result = CancelableEvery.new
  do_work = proc do
    begin
      block.call
    ensure
      result.current = queue(delay, &do_work)
    end
  end
  result.current = queue(delay, &do_work)
  result
end

#on_error(&block) ⇒ Object



106
107
108
# File 'lib/message_bus/timer_thread.rb', line 106

def on_error(&block)
  @on_error = block
end

#queue(delay = 0, &block) ⇒ Object

queue a block to run after a certain delay (in seconds)



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/message_bus/timer_thread.rb', line 75

def queue(delay = 0, &block)
  queue_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay
  job = [queue_time, block]

  @mutex.synchronize do
    i = @jobs.length
    while i > 0
      i -= 1
      current, _ = @jobs[i]
      if current < queue_time
        i += 1
        break
      end
    end
    @jobs.insert(i, job)
    @next = queue_time if i == 0
  end

  unless @thread.alive?
    @mutex.synchronize do
      @thread = Thread.new { do_work } unless @thread.alive?
    end
  end

  if @thread.status == "sleep"
    @thread.wakeup
  end

  Cancelable.new(job)
end

#stopObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/message_bus/timer_thread.rb', line 42

def stop
  @stopped = true
  running = true
  while running
    @mutex.synchronize do
      running = @thread && @thread.alive?

      if running
        begin
          @thread.wakeup
        rescue ThreadError
          raise if @thread.alive?
        end
      end
    end
    sleep 0
  end
end