Class: Resque::Durable::BackgroundHeartbeat
- Inherits:
-
Object
- Object
- Resque::Durable::BackgroundHeartbeat
- Defined in:
- lib/resque/durable/background_heartbeat.rb
Overview
Creates a background thread to regularly heartbeat the queue audit.
Class Method Summary collapse
-
.exit_now! ⇒ Object
only a separate method for easy stubbing.
Instance Method Summary collapse
- #heartbeat! ⇒ Object
-
#initialize(queue_audit, interval) ⇒ BackgroundHeartbeat
constructor
A new instance of BackgroundHeartbeat.
-
#signal_stop! ⇒ Object
Signal the ‘heartbeat` thread to stop looping immediately.
- #start! ⇒ Object
- #stop_and_wait! ⇒ Object
- #with_heartbeat ⇒ Object
Constructor Details
#initialize(queue_audit, interval) ⇒ BackgroundHeartbeat
Returns a new instance of BackgroundHeartbeat.
9 10 11 12 13 14 15 16 17 |
# File 'lib/resque/durable/background_heartbeat.rb', line 9 def initialize(queue_audit, interval) @queue_audit = queue_audit @last_timeout = nil @interval = interval @mutex = Mutex.new @cv = ConditionVariable.new @stop = false @thread = nil end |
Class Method Details
.exit_now! ⇒ Object
only a separate method for easy stubbing
21 22 23 |
# File 'lib/resque/durable/background_heartbeat.rb', line 21 def exit_now! abort end |
Instance Method Details
#heartbeat! ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/resque/durable/background_heartbeat.rb', line 33 def heartbeat! @last_timeout ||= @queue_audit.timeout_at @last_timeout = @queue_audit.optimistic_heartbeat!(@last_timeout) rescue StandardError => e @queue_audit.logger.error("Exception in BackgroundHeartbeat thread: #{e.class.name}: #{e.}") self.class.exit_now! ensure ActiveRecord::Base.clear_active_connections! end |
#signal_stop! ⇒ Object
Signal the ‘heartbeat` thread to stop looping immediately. Safe to be call from any thread.
75 76 77 78 79 80 81 |
# File 'lib/resque/durable/background_heartbeat.rb', line 75 def signal_stop! return unless @thread @mutex.synchronize do @stop = true @cv.broadcast end end |
#start! ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/resque/durable/background_heartbeat.rb', line 43 def start! raise "Thread is already running!" if @thread @stop = false # Perform immediately to reduce heartbeat race condition opportunities heartbeat! @thread = Thread.new do while !@stop heartbeat! @mutex.synchronize do end_at = monotonic_now + @interval while !@stop && monotonic_now < end_at sleep_for = end_at - monotonic_now @cv.wait(@mutex, sleep_for) end end end end end |
#stop_and_wait! ⇒ Object
65 66 67 68 69 70 71 72 |
# File 'lib/resque/durable/background_heartbeat.rb', line 65 def stop_and_wait! return unless @thread signal_stop! # Prevent deadlock if called by the `heartbeat` thread, which can't wait for itself to die. return if @thread == Thread.current @thread.join @thread = nil end |
#with_heartbeat ⇒ Object
26 27 28 29 30 31 |
# File 'lib/resque/durable/background_heartbeat.rb', line 26 def with_heartbeat start! yield ensure stop_and_wait! end |