Class: MessageBus::TimerThread
- Inherits:
-
Object
- Object
- MessageBus::TimerThread
- Defined in:
- lib/message_bus/timer_thread.rb
Defined Under Namespace
Classes: Cancelable, CancelableEvery
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
Instance Method Summary collapse
- #every(delay, &block) ⇒ Object
-
#initialize ⇒ TimerThread
constructor
A new instance of TimerThread.
- #on_error(&block) ⇒ Object
-
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds).
- #stop ⇒ Object
Constructor Details
#initialize ⇒ TimerThread
Returns a new instance of TimerThread.
33 34 35 36 37 38 39 40 |
# File 'lib/message_bus/timer_thread.rb', line 33 def initialize @stopped = false @jobs = [] @mutex = Mutex.new @next = nil @thread = Thread.new { do_work } @on_error = lambda { |e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}" } end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
4 5 6 |
# File 'lib/message_bus/timer_thread.rb', line 4 def jobs @jobs end |
Instance Method Details
#every(delay, &block) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/message_bus/timer_thread.rb', line 54 def every(delay, &block) result = CancelableEvery.new do_work = proc do begin block.call ensure result.current = queue(delay, &do_work) end end result.current = queue(delay, &do_work) result end |
#on_error(&block) ⇒ Object
99 100 101 |
# File 'lib/message_bus/timer_thread.rb', line 99 def on_error(&block) @on_error = block end |
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds)
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/message_bus/timer_thread.rb', line 68 def queue(delay = 0, &block) queue_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay job = [queue_time, block] @mutex.synchronize do i = @jobs.length while i > 0 i -= 1 current, _ = @jobs[i] if current < queue_time i += 1 break end end @jobs.insert(i, job) @next = queue_time if i == 0 end unless @thread.alive? @mutex.synchronize do @thread = Thread.new { do_work } unless @thread.alive? end end if @thread.status == "sleep" @thread.wakeup end Cancelable.new(job) end |
#stop ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/message_bus/timer_thread.rb', line 42 def stop @stopped = true running = true while running @mutex.synchronize do running = @thread && @thread.alive? @thread.wakeup if running end sleep 0 end end |