Class: Sensu::Transport::RabbitMQ
- Defined in:
- lib/sensu/transport/rabbitmq.rb
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#acknowledge(info) {|info| ... } ⇒ Object
(also: #ack)
Acknowledge the delivery of a message from RabbitMQ.
-
#close ⇒ Object
Close the RabbitMQ connection.
-
#connect(options = {}) ⇒ Object
RabbitMQ connection setup.
-
#connected? ⇒ TrueClass, FalseClass
Indicates if connected to RabbitMQ.
-
#publish(type, pipe, message, options = {}) {|info| ... } ⇒ Object
Publish a message to RabbitMQ.
-
#reconnect(force = false) ⇒ Object
Reconnect to RabbitMQ.
-
#stats(funnel, options = {}) {|info| ... } ⇒ Object
RabbitMQ queue stats, including message and consumer counts.
-
#subscribe(type, pipe, funnel = "", options = {}) {|info, message| ... } ⇒ Object
Subscribe to a RabbitMQ queue.
-
#unsubscribe {|info| ... } ⇒ Object
Unsubscribe from all RabbitMQ queues.
Methods inherited from Base
#after_reconnect, #before_reconnect, descendants, #initialize, #on_error
Constructor Details
This class inherits a constructor from Sensu::Transport::Base
Instance Method Details
#acknowledge(info) {|info| ... } ⇒ Object Also known as: ack
Acknowledge the delivery of a message from RabbitMQ.
117 118 119 120 121 122 |
# File 'lib/sensu/transport/rabbitmq.rb', line 117 def acknowledge(info) catch_errors do info.ack end super end |
#close ⇒ Object
Close the RabbitMQ connection.
43 44 45 46 |
# File 'lib/sensu/transport/rabbitmq.rb', line 43 def close callback = Proc.new { @connection.close } connected? ? callback.call : EM.next_tick(callback) end |
#connect(options = {}) ⇒ Object
RabbitMQ connection setup. The deferred status is set to ‘:succeeded` (via `succeed()`) once the connection has been established.
16 17 18 19 20 21 |
# File 'lib/sensu/transport/rabbitmq.rb', line 16 def connect(={}) reset () create_connection_timeout end |
#connected? ⇒ TrueClass, FalseClass
Indicates if connected to RabbitMQ.
38 39 40 |
# File 'lib/sensu/transport/rabbitmq.rb', line 38 def connected? @connection.connected? end |
#publish(type, pipe, message, options = {}) {|info| ... } ⇒ Object
Publish a message to RabbitMQ.
59 60 61 62 63 64 65 66 |
# File 'lib/sensu/transport/rabbitmq.rb', line 59 def publish(type, pipe, , ={}) catch_errors do @channel.method(type.to_sym).call(pipe, ).publish() do info = {} yield(info) if block_given? end end end |
#reconnect(force = false) ⇒ Object
Reconnect to RabbitMQ.
26 27 28 29 30 31 32 33 |
# File 'lib/sensu/transport/rabbitmq.rb', line 26 def reconnect(force=false) unless @reconnecting @reconnecting = true @before_reconnect.call reset periodically_reconnect end end |
#stats(funnel, options = {}) {|info| ... } ⇒ Object
RabbitMQ queue stats, including message and consumer counts.
133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/sensu/transport/rabbitmq.rb', line 133 def stats(funnel, ={}) catch_errors do = .merge(:auto_delete => true) @channel.queue(funnel, ).status do |, consumers| info = { :messages => , :consumers => consumers } yield(info) end end end |
#subscribe(type, pipe, funnel = "", options = {}) {|info, message| ... } ⇒ Object
Subscribe to a RabbitMQ queue.
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/sensu/transport/rabbitmq.rb', line 79 def subscribe(type, pipe, funnel="", ={}, &callback) catch_errors do previously_declared = @queues.has_key?(funnel) @queues[funnel] ||= @channel.queue!(funnel, :auto_delete => true) queue = @queues[funnel] queue.bind(@channel.method(type.to_sym).call(pipe)) unless previously_declared queue.subscribe(, &callback) end end end |
#unsubscribe {|info| ... } ⇒ Object
Unsubscribe from all RabbitMQ queues.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/sensu/transport/rabbitmq.rb', line 95 def unsubscribe catch_errors do @queues.values.each do |queue| if connected? queue.unsubscribe else queue.before_recovery do queue.unsubscribe end end end @queues = {} @channel.recover if connected? end super end |