Class: MessageBus::TimerThread
- Inherits:
-
Object
- Object
- MessageBus::TimerThread
- Defined in:
- lib/message_bus/timer_thread.rb
Defined Under Namespace
Classes: Cancelable, CancelableEvery
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
Instance Method Summary collapse
- #every(delay, &block) ⇒ Object
-
#initialize ⇒ TimerThread
constructor
A new instance of TimerThread.
- #on_error(&block) ⇒ Object
-
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds).
- #stop ⇒ Object
Constructor Details
#initialize ⇒ TimerThread
Returns a new instance of TimerThread.
32 33 34 35 36 37 38 39 |
# File 'lib/message_bus/timer_thread.rb', line 32 def initialize @stopped = false @jobs = [] @mutex = Mutex.new @next = nil @thread = Thread.new { do_work } @on_error = lambda { |e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}" } end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
4 5 6 |
# File 'lib/message_bus/timer_thread.rb', line 4 def jobs @jobs end |
Instance Method Details
#every(delay, &block) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/message_bus/timer_thread.rb', line 53 def every(delay, &block) result = CancelableEvery.new do_work = proc do begin block.call ensure result.current = queue(delay, &do_work) end end result.current = queue(delay, &do_work) result end |
#on_error(&block) ⇒ Object
98 99 100 |
# File 'lib/message_bus/timer_thread.rb', line 98 def on_error(&block) @on_error = block end |
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds)
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/message_bus/timer_thread.rb', line 67 def queue(delay = 0, &block) queue_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay job = [queue_time, block] @mutex.synchronize do i = @jobs.length while i > 0 i -= 1 current, _ = @jobs[i] if current < queue_time i += 1 break end end @jobs.insert(i, job) @next = queue_time if i == 0 end unless @thread.alive? @mutex.synchronize do @thread = Thread.new { do_work } unless @thread.alive? end end if @thread.status == "sleep" @thread.wakeup end Cancelable.new(job) end |
#stop ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/message_bus/timer_thread.rb', line 41 def stop @stopped = true running = true while running @mutex.synchronize do running = @thread && @thread.alive? @thread.wakeup if running end sleep 0 end end |