Class: Resque::Durable::BackgroundHeartbeat
- Inherits:
-
Object
- Object
- Resque::Durable::BackgroundHeartbeat
- Defined in:
- lib/resque/durable/background_heartbeat.rb
Overview
Creates a background thread to regularly heartbeat the queue audit.
Class Method Summary collapse
-
.exit_now! ⇒ Object
only a separate method for easy stubbing.
Instance Method Summary collapse
- #heartbeat! ⇒ Object
-
#initialize(queue_audit, interval) ⇒ BackgroundHeartbeat
constructor
A new instance of BackgroundHeartbeat.
-
#signal_stop! ⇒ Object
Signal the
heartbeatthread to stop looping immediately. - #start! ⇒ Object
- #stop_and_wait! ⇒ Object
- #with_heartbeat ⇒ Object
Constructor Details
#initialize(queue_audit, interval) ⇒ BackgroundHeartbeat
Returns a new instance of BackgroundHeartbeat.
9 10 11 12 13 14 15 16 |
# File 'lib/resque/durable/background_heartbeat.rb', line 9 def initialize(queue_audit, interval) @queue_audit = queue_audit @last_timeout = nil @interval = interval @mutex = Mutex.new @stop = false @thread = nil end |
Class Method Details
.exit_now! ⇒ Object
only a separate method for easy stubbing
20 21 22 |
# File 'lib/resque/durable/background_heartbeat.rb', line 20 def exit_now! abort end |
Instance Method Details
#heartbeat! ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/resque/durable/background_heartbeat.rb', line 32 def heartbeat! @last_timeout ||= @queue_audit.timeout_at @last_timeout = @queue_audit.optimistic_heartbeat!(@last_timeout) rescue StandardError => e @queue_audit.logger.error("Exception in BackgroundHeartbeat thread: #{e.class.name}: #{e.message}") self.class.exit_now! ensure ActiveRecord::Base.clear_active_connections! end |
#signal_stop! ⇒ Object
Signal the heartbeat thread to stop looping immediately. Safe to be call from any thread.
73 74 75 76 77 78 79 |
# File 'lib/resque/durable/background_heartbeat.rb', line 73 def signal_stop! return unless @thread @mutex.synchronize do @stop = true @thread.wakeup end end |
#start! ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/resque/durable/background_heartbeat.rb', line 42 def start! raise "Thread is already running!" if @thread @stop = false # Perform immediately to reduce heartbeat race condition opportunities heartbeat! @thread = Thread.new do while !@stop heartbeat! @mutex.synchronize do @mutex.sleep(@interval) end end end end |
#stop_and_wait! ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/resque/durable/background_heartbeat.rb', line 60 def stop_and_wait! return unless @thread # Prevent deadlock if called by the `heartbeat` thread, which can't wait for itself to die. return signal_stop! if @thread == Thread.current while @thread.alive? signal_stop! sleep 0.01 end @thread.join @thread = nil end |
#with_heartbeat ⇒ Object
25 26 27 28 29 30 |
# File 'lib/resque/durable/background_heartbeat.rb', line 25 def with_heartbeat start! yield ensure stop_and_wait! end |