Class: WhoCan::Heartbeater::EKG

Inherits:
Object
  • Object
show all
Includes:
Deferred, Logging
Defined in:
lib/who_can/heartbeater/ekg.rb

Overview

Pings the broker at regular intervals. After N number of successive failures, will fire the on_heartbeat_failure callback (note: not the errback, the callback). Due to the nature of Deferreds, after a failure a Monitor instance must be discarded.

a successful beat resets failure_count

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #logger

Constructor Details

#initialize(connection, opts = {}) ⇒ EKG

Create a new Monitor.

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :max_failures (Integer) — default: 3

    fire the on_heartbeat_failure callback after this many failures

  • :interval (Numeric) — default: 2.0

    interval between beats

  • :timeout (Numeric) — default: 5.0

    time to live for a single beat



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/who_can/heartbeater/ekg.rb', line 33

def initialize(connection, opts={})
  opts = {:interval => 2.0, :max_failures => 3, :timeout => 5.0}.merge(opts)

  @connection     = connection
  @channel        = nil
  @timer          = nil
  @interval       = opts[:interval]
  @max_failures   = opts[:max_failures]
  @beat_timeout_after = opts[:timeout]
  @queue          = nil
  @failure_count  = 0
  @running        = false
  @beat_num       = 0
  @outstanding_beats = {}
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



14
15
16
# File 'lib/who_can/heartbeater/ekg.rb', line 14

def connection
  @connection
end

#failure_countObject (readonly)

Returns the value of attribute failure_count.



14
15
16
# File 'lib/who_can/heartbeater/ekg.rb', line 14

def failure_count
  @failure_count
end

#intervalObject (readonly)

Returns the value of attribute interval.



14
15
16
# File 'lib/who_can/heartbeater/ekg.rb', line 14

def interval
  @interval
end

#max_failuresObject (readonly)

Returns the value of attribute max_failures.



14
15
16
# File 'lib/who_can/heartbeater/ekg.rb', line 14

def max_failures
  @max_failures
end

#queueObject (readonly)

Returns the value of attribute queue.



14
15
16
# File 'lib/who_can/heartbeater/ekg.rb', line 14

def queue
  @queue
end

Instance Method Details

#cancel_outstanding_beats!Object

goes through the outstanding_beats and cancels them



163
164
165
166
# File 'lib/who_can/heartbeater/ekg.rb', line 163

def cancel_outstanding_beats!
  outstanding_beats, @outstanding_beats = @outstanding_beats, {}
  outstanding_beats.values.each { |b| b.cancel! }
end

#channelObject



49
50
51
# File 'lib/who_can/heartbeater/ekg.rb', line 49

def channel
  @channel ||= AMQP::Channel.new(connection)
end

#do_beat!Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/who_can/heartbeater/ekg.rb', line 117

def do_beat!
  return unless @running
  @timer = nil

  beat_id = "beat_#{@beat_num}"
  @beat_num += 1

  beat = Beat.new(channel, queue, beat_id, @beat_timeout_after)

  @outstanding_beats[beat_id] = beat

  beat.ensure_that { @outstanding_beats.delete(beat_id) }

  beat.callback do
    logger.debug { "beat #{beat_id.inspect} was successful" }
    @failure_count = 0
  end

  beat.errback do |*a|
    handle_failure(beat, *a)
  end

  beat.start!

  # set this up to happen regularly if it hasn't been done already
  @timer = EM::Timer.new(interval) { do_beat! }
end

#handle_failure(beat, exception = nil) ⇒ Object



152
153
154
155
156
157
158
159
# File 'lib/who_can/heartbeater/ekg.rb', line 152

def handle_failure(beat, exception=nil)
  logger.warn { "beat errback called! #{beat.beat_id}" }
  @failure_count += 1
  if @failure_count >= @max_failures
    logger.warn { "@failure_count #{@failure_count} >= @max_failures #{@max_failures}, firing heartbeat_failure" }
    heartbeat_failure!
  end
end

#handle_message(header, payload) ⇒ Object



109
110
111
112
113
114
115
# File 'lib/who_can/heartbeater/ekg.rb', line 109

def handle_message(header, payload)
  msg_id = header.message_id

  if outstanding_beat = @outstanding_beats.delete(msg_id)
    outstanding_beat.ping_received!
  end
end

#heartbeat_failure!Object



145
146
147
148
149
150
# File 'lib/who_can/heartbeater/ekg.rb', line 145

def heartbeat_failure!
  logger.warn { "heartbeat_failure!" }
  cancel_outstanding_beats!
  on_heartbeat_failure.succeed
  shutdown!
end

#shutdown!(&blk) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/who_can/heartbeater/ekg.rb', line 82

def shutdown!(&blk)
  on_shutdown(&blk)
  return on_shutdown unless @running
  @running = false

  logger.debug { "performing shutdown!" }

  @queue.unsubscribe if @queue

  @timer.cancel if @timer

  close_failsafe_timer = EM::Timer.new(1.0) do
    logger.warn { "channel.close never called its block, probably hung. calling on_shutdown.succeed" }
    on_shutdown.succeed
  end

  @channel.close {
    close_failsafe_timer.cancel
    logger.debug { "channel has closed!" }
    on_shutdown.succeed 
  }

  on_shutdown
rescue AMQ::Client::ConnectionClosedError
  on_shutdown.succeed
end

#start!(&blk) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/who_can/heartbeater/ekg.rb', line 53

def start!(&blk)
  on_start(&blk)
  return on_start if @running
  @running = true

  connection.on_tcp_connection_loss do 
    logger.error { "on_tcp_connection_loss callback fired!!" }
    heartbeat_failure!
  end

  connection.on_open do
    channel.once_opened do
      channel.on_error { |*a| logger.error { "channel.on_error called with #{a.inspect}" } }
      channel.queue('', :exclusive => true, :auto_delete => true) do |q|
        @queue = q

        confirmed = lambda do |*|
          logger.debug { "confirmed queue subscription!" }
          on_start.succeed
          do_beat!
        end

        @queue.subscribe(:confirm => confirmed, &method(:handle_message))
      end
    end
  end
  on_start
end