Class: MessageBus::TimerThread
- Inherits:
-
Object
- Object
- MessageBus::TimerThread
- Defined in:
- lib/message_bus/timer_thread.rb
Defined Under Namespace
Classes: Cancelable, CancelableEvery
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
Instance Method Summary collapse
- #every(delay, &block) ⇒ Object
-
#initialize ⇒ TimerThread
constructor
A new instance of TimerThread.
- #on_error(&block) ⇒ Object
-
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds).
- #stop ⇒ Object
Constructor Details
#initialize ⇒ TimerThread
Returns a new instance of TimerThread.
25 26 27 28 29 30 31 32 |
# File 'lib/message_bus/timer_thread.rb', line 25 def initialize @stopped = false @jobs = [] @mutex = Mutex.new @next = nil @thread = Thread.new { do_work } @on_error = lambda { |e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}" } end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
4 5 6 |
# File 'lib/message_bus/timer_thread.rb', line 4 def jobs @jobs end |
Instance Method Details
#every(delay, &block) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/message_bus/timer_thread.rb', line 46 def every(delay, &block) result = CancelableEvery.new do_work = proc do begin block.call ensure result.current = queue(delay, &do_work) end end result.current = queue(delay, &do_work) result end |
#on_error(&block) ⇒ Object
91 92 93 |
# File 'lib/message_bus/timer_thread.rb', line 91 def on_error(&block) @on_error = block end |
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/message_bus/timer_thread.rb', line 60 def queue(delay = 0, &block) queue_time = Time.new.to_f + delay job = [queue_time, block] @mutex.synchronize do i = @jobs.length while i > 0 i -= 1 current, _ = @jobs[i] if current < queue_time i += 1 break end end @jobs.insert(i, job) @next = queue_time if i == 0 end unless @thread.alive? @mutex.synchronize do @thread = Thread.new { do_work } unless @thread.alive? end end if @thread.status == "sleep" @thread.wakeup end Cancelable.new(job) end |
#stop ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/message_bus/timer_thread.rb', line 34 def stop @stopped = true running = true while running @mutex.synchronize do running = @thread && @thread.alive? @thread.wakeup if running end sleep 0 end end |