Class: MessageBus::TimerThread
- Inherits:
-
Object
- Object
- MessageBus::TimerThread
- Defined in:
- lib/message_bus/timer_thread.rb
Defined Under Namespace
Classes: Cancelable, CancelableEvery
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
Instance Method Summary collapse
- #every(delay, &block) ⇒ Object
-
#initialize ⇒ TimerThread
constructor
A new instance of TimerThread.
- #on_error(&block) ⇒ Object
-
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds).
- #stop ⇒ Object
Constructor Details
#initialize ⇒ TimerThread
Returns a new instance of TimerThread.
24 25 26 27 28 29 30 31 |
# File 'lib/message_bus/timer_thread.rb', line 24 def initialize @stopped = false @jobs = [] @mutex = Mutex.new @next = nil @thread = Thread.new{do_work} @on_error = lambda{|e| STDERR.puts "Exception while processing Timer:\n #{e.backtrace.join("\n")}"} end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
3 4 5 |
# File 'lib/message_bus/timer_thread.rb', line 3 def jobs @jobs end |
Instance Method Details
#every(delay, &block) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/message_bus/timer_thread.rb', line 45 def every(delay, &block) result = CancelableEvery.new do_work = proc do begin block.call ensure result.current = queue(delay, &do_work) end end result.current = queue(delay,&do_work) result end |
#on_error(&block) ⇒ Object
87 88 89 |
# File 'lib/message_bus/timer_thread.rb', line 87 def on_error(&block) @on_error = block end |
#queue(delay = 0, &block) ⇒ Object
queue a block to run after a certain delay (in seconds)
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/message_bus/timer_thread.rb', line 59 def queue(delay=0, &block) queue_time = Time.new.to_f + delay job = [queue_time, block] @mutex.synchronize do i = @jobs.length while i > 0 i -= 1 current,_ = @jobs[i] i+=1 and break if current < queue_time end @jobs.insert(i, job) @next = queue_time if i==0 end unless @thread.alive? @mutex.synchronize do @thread = Thread.new{do_work} unless @thread.alive? end end if @thread.status == "sleep".freeze @thread.wakeup end Cancelable.new(job) end |
#stop ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/message_bus/timer_thread.rb', line 33 def stop @stopped = true running = true while running @mutex.synchronize do running = @thread && @thread.alive? @thread.wakeup if running end sleep 0 end end |