Class: WhoCan::Heartbeater::EKG
- Inherits:
-
Object
- Object
- WhoCan::Heartbeater::EKG
- Includes:
- Deferred, Logging
- Defined in:
- lib/who_can/heartbeater/ekg.rb
Overview
Pings the broker at regular intervals. After N number of successive failures, will fire the on_heartbeat_failure callback (note: not the errback, the callback). Due to the nature of Deferreds, after a failure a Monitor instance must be discarded.
a successful beat resets failure_count
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#failure_count ⇒ Object
readonly
Returns the value of attribute failure_count.
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
-
#max_failures ⇒ Object
readonly
Returns the value of attribute max_failures.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#cancel_outstanding_beats! ⇒ Object
goes through the outstanding_beats and cancels them.
- #channel ⇒ Object
- #do_beat! ⇒ Object
- #handle_failure(beat, exception = nil) ⇒ Object
- #handle_message(header, payload) ⇒ Object
- #heartbeat_failure! ⇒ Object
-
#initialize(connection, opts = {}) ⇒ EKG
constructor
Create a new Monitor.
- #shutdown!(&blk) ⇒ Object
- #start!(&blk) ⇒ Object
Methods included from Logging
Constructor Details
#initialize(connection, opts = {}) ⇒ EKG
Create a new Monitor.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/who_can/heartbeater/ekg.rb', line 33 def initialize(connection, opts={}) opts = {:interval => 2.0, :max_failures => 3, :timeout => 5.0}.merge(opts) @connection = connection @channel = nil @timer = nil @interval = opts[:interval] @max_failures = opts[:max_failures] @beat_timeout_after = opts[:timeout] @queue = nil @failure_count = 0 @running = false @beat_num = 0 @outstanding_beats = {} end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
14 15 16 |
# File 'lib/who_can/heartbeater/ekg.rb', line 14 def connection @connection end |
#failure_count ⇒ Object (readonly)
Returns the value of attribute failure_count.
14 15 16 |
# File 'lib/who_can/heartbeater/ekg.rb', line 14 def failure_count @failure_count end |
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
14 15 16 |
# File 'lib/who_can/heartbeater/ekg.rb', line 14 def interval @interval end |
#max_failures ⇒ Object (readonly)
Returns the value of attribute max_failures.
14 15 16 |
# File 'lib/who_can/heartbeater/ekg.rb', line 14 def max_failures @max_failures end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
14 15 16 |
# File 'lib/who_can/heartbeater/ekg.rb', line 14 def queue @queue end |
Instance Method Details
#cancel_outstanding_beats! ⇒ Object
goes through the outstanding_beats and cancels them
163 164 165 166 |
# File 'lib/who_can/heartbeater/ekg.rb', line 163 def cancel_outstanding_beats! outstanding_beats, @outstanding_beats = @outstanding_beats, {} outstanding_beats.values.each { |b| b.cancel! } end |
#channel ⇒ Object
49 50 51 |
# File 'lib/who_can/heartbeater/ekg.rb', line 49 def channel @channel ||= AMQP::Channel.new(connection) end |
#do_beat! ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/who_can/heartbeater/ekg.rb', line 117 def do_beat! return unless @running @timer = nil beat_id = "beat_#{@beat_num}" @beat_num += 1 beat = Beat.new(channel, queue, beat_id, @beat_timeout_after) @outstanding_beats[beat_id] = beat beat.ensure_that { @outstanding_beats.delete(beat_id) } beat.callback do logger.debug { "beat #{beat_id.inspect} was successful" } @failure_count = 0 end beat.errback do |*a| handle_failure(beat, *a) end beat.start! # set this up to happen regularly if it hasn't been done already @timer = EM::Timer.new(interval) { do_beat! } end |
#handle_failure(beat, exception = nil) ⇒ Object
152 153 154 155 156 157 158 159 |
# File 'lib/who_can/heartbeater/ekg.rb', line 152 def handle_failure(beat, exception=nil) logger.warn { "beat errback called! #{beat.beat_id}" } @failure_count += 1 if @failure_count >= @max_failures logger.warn { "@failure_count #{@failure_count} >= @max_failures #{@max_failures}, firing heartbeat_failure" } heartbeat_failure! end end |
#handle_message(header, payload) ⇒ Object
109 110 111 112 113 114 115 |
# File 'lib/who_can/heartbeater/ekg.rb', line 109 def (header, payload) msg_id = header. if outstanding_beat = @outstanding_beats.delete(msg_id) outstanding_beat.ping_received! end end |
#heartbeat_failure! ⇒ Object
145 146 147 148 149 150 |
# File 'lib/who_can/heartbeater/ekg.rb', line 145 def heartbeat_failure! logger.warn { "heartbeat_failure!" } cancel_outstanding_beats! on_heartbeat_failure.succeed shutdown! end |
#shutdown!(&blk) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/who_can/heartbeater/ekg.rb', line 82 def shutdown!(&blk) on_shutdown(&blk) return on_shutdown unless @running @running = false logger.debug { "performing shutdown!" } @queue.unsubscribe if @queue @timer.cancel if @timer close_failsafe_timer = EM::Timer.new(1.0) do logger.warn { "channel.close never called its block, probably hung. calling on_shutdown.succeed" } on_shutdown.succeed end @channel.close { close_failsafe_timer.cancel logger.debug { "channel has closed!" } on_shutdown.succeed } on_shutdown rescue AMQ::Client::ConnectionClosedError on_shutdown.succeed end |
#start!(&blk) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/who_can/heartbeater/ekg.rb', line 53 def start!(&blk) on_start(&blk) return on_start if @running @running = true connection.on_tcp_connection_loss do logger.error { "on_tcp_connection_loss callback fired!!" } heartbeat_failure! end connection.on_open do channel.once_opened do channel.on_error { |*a| logger.error { "channel.on_error called with #{a.inspect}" } } channel.queue('', :exclusive => true, :auto_delete => true) do |q| @queue = q confirmed = lambda do |*| logger.debug { "confirmed queue subscription!" } on_start.succeed do_beat! end @queue.subscribe(:confirm => confirmed, &method(:handle_message)) end end end on_start end |