Class: Resque::Durable::BackgroundHeartbeat

Inherits:
Object
  • Object
show all
Defined in:
lib/resque/durable/background_heartbeat.rb

Overview

Creates a background thread to regularly heartbeat the queue audit.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_audit, interval) ⇒ BackgroundHeartbeat

Returns a new instance of BackgroundHeartbeat.



9
10
11
12
13
14
15
16
# File 'lib/resque/durable/background_heartbeat.rb', line 9

def initialize(queue_audit, interval)
  @queue_audit  = queue_audit
  @last_timeout = nil
  @interval     = interval
  @mutex        = Mutex.new
  @stop         = false
  @thread       = nil
end

Class Method Details

.exit_now!Object

only a separate method for easy stubbing



20
21
22
# File 'lib/resque/durable/background_heartbeat.rb', line 20

def exit_now!
  abort
end

Instance Method Details

#heartbeat!Object



32
33
34
35
36
37
38
39
40
# File 'lib/resque/durable/background_heartbeat.rb', line 32

def heartbeat!
  @last_timeout ||= @queue_audit.timeout_at
  @last_timeout = @queue_audit.optimistic_heartbeat!(@last_timeout)
rescue StandardError => e
  @queue_audit.logger.error("Exception in BackgroundHeartbeat thread: #{e.class.name}: #{e.message}")
  self.class.exit_now!
ensure
  ActiveRecord::Base.clear_active_connections!
end

#signal_stop!Object

Signal the heartbeat thread to stop looping immediately. Safe to be call from any thread.



73
74
75
76
77
78
79
# File 'lib/resque/durable/background_heartbeat.rb', line 73

def signal_stop!
  return unless @thread
  @mutex.synchronize do
    @stop = true
    @thread.wakeup
  end
end

#start!Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/resque/durable/background_heartbeat.rb', line 42

def start!
  raise "Thread is already running!" if @thread
  @stop = false

  # Perform immediately to reduce heartbeat race condition opportunities
  heartbeat!

  @thread = Thread.new do
    while !@stop
      heartbeat!

      @mutex.synchronize do
        @mutex.sleep(@interval)
      end
    end
  end
end

#stop_and_wait!Object



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/resque/durable/background_heartbeat.rb', line 60

def stop_and_wait!
  return unless @thread
  # Prevent deadlock if called by the `heartbeat` thread, which can't wait for itself to die.
  return signal_stop! if @thread == Thread.current
  while @thread.alive?
    signal_stop!
    sleep 0.01
  end
  @thread.join
  @thread = nil
end

#with_heartbeatObject



25
26
27
28
29
30
# File 'lib/resque/durable/background_heartbeat.rb', line 25

def with_heartbeat
  start!
  yield
ensure
  stop_and_wait!
end