Class: Resque::Durable::BackgroundHeartbeat

Inherits:
Object
  • Object
show all
Defined in:
lib/resque/durable/background_heartbeat.rb

Overview

Creates a background thread to regularly heartbeat the queue audit.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_audit, interval) ⇒ BackgroundHeartbeat

Returns a new instance of BackgroundHeartbeat.



9
10
11
12
13
14
15
16
17
# File 'lib/resque/durable/background_heartbeat.rb', line 9

def initialize(queue_audit, interval)
  @queue_audit  = queue_audit
  @last_timeout = nil
  @interval     = interval
  @mutex        = Mutex.new
  @cv           = ConditionVariable.new
  @stop         = false
  @thread       = nil
end

Class Method Details

.exit_now!Object

only a separate method for easy stubbing



21
22
23
# File 'lib/resque/durable/background_heartbeat.rb', line 21

def exit_now!
  abort
end

Instance Method Details

#heartbeat!Object



33
34
35
36
37
38
39
40
41
# File 'lib/resque/durable/background_heartbeat.rb', line 33

def heartbeat!
  @last_timeout ||= @queue_audit.timeout_at
  @last_timeout = @queue_audit.optimistic_heartbeat!(@last_timeout)
rescue StandardError => e
  @queue_audit.logger.error("Exception in BackgroundHeartbeat thread: #{e.class.name}: #{e.message}")
  self.class.exit_now!
ensure
  ActiveRecord::Base.clear_active_connections!
end

#signal_stop!Object

Signal the ‘heartbeat` thread to stop looping immediately. Safe to be call from any thread.



75
76
77
78
79
80
81
# File 'lib/resque/durable/background_heartbeat.rb', line 75

def signal_stop!
  return unless @thread
  @mutex.synchronize do
    @stop = true
    @cv.broadcast
  end
end

#start!Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/resque/durable/background_heartbeat.rb', line 43

def start!
  raise "Thread is already running!" if @thread
  @stop = false

  # Perform immediately to reduce heartbeat race condition opportunities
  heartbeat!

  @thread = Thread.new do
    while !@stop
      heartbeat!

      @mutex.synchronize do
        end_at = monotonic_now + @interval
        while !@stop && monotonic_now < end_at
          sleep_for = end_at - monotonic_now
          @cv.wait(@mutex, sleep_for)
        end
      end
    end
  end
end

#stop_and_wait!Object



65
66
67
68
69
70
71
72
# File 'lib/resque/durable/background_heartbeat.rb', line 65

def stop_and_wait!
  return unless @thread
  signal_stop!
  # Prevent deadlock if called by the `heartbeat` thread, which can't wait for itself to die.
  return if @thread == Thread.current
  @thread.join
  @thread = nil
end

#with_heartbeatObject



26
27
28
29
30
31
# File 'lib/resque/durable/background_heartbeat.rb', line 26

def with_heartbeat
  start!
  yield
ensure
  stop_and_wait!
end