Class: Sensu::Transport::RabbitMQ
- Inherits:
-
Base
- Object
- Base
- Sensu::Transport::RabbitMQ
show all
- Defined in:
- lib/sensu/transport/rabbitmq.rb
Instance Attribute Summary
Attributes inherited from Base
#logger
Instance Method Summary
collapse
-
#acknowledge(info, &callback) ⇒ Object
-
#close ⇒ Object
-
#connect(options = {}) ⇒ Object
-
#connected? ⇒ Boolean
-
#initialize ⇒ RabbitMQ
constructor
A new instance of RabbitMQ.
-
#publish(exchange_type, exchange_name, message, options = {}, &callback) ⇒ Object
-
#stats(queue_name, options = {}, &callback) ⇒ Object
-
#subscribe(exchange_type, exchange_name, queue_name = "", options = {}, &callback) ⇒ Object
-
#unsubscribe(&callback) ⇒ Object
Methods inherited from Base
#ack, #after_reconnect, #before_reconnect, connect, descendants, #on_error
Constructor Details
Returns a new instance of RabbitMQ.
10
11
12
13
|
# File 'lib/sensu/transport/rabbitmq.rb', line 10
def initialize
super
@queues = {}
end
|
Instance Method Details
#acknowledge(info, &callback) ⇒ Object
82
83
84
85
|
# File 'lib/sensu/transport/rabbitmq.rb', line 82
def acknowledge(info, &callback)
info.ack
callback.call(info) if callback
end
|
#close ⇒ Object
41
42
43
|
# File 'lib/sensu/transport/rabbitmq.rb', line 41
def close
@connection.close
end
|
#connect(options = {}) ⇒ Object
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/sensu/transport/rabbitmq.rb', line 15
def connect(options={})
timeout = create_connection_timeout
on_failure = on_connection_failure
@connection = AMQP.connect(options, {
:on_tcp_connection_failure => on_failure,
:on_possible_authentication_failure => on_failure
})
@connection.logger = @logger
@connection.on_open do
timeout.cancel
end
reconnect = Proc.new do
unless @connection.reconnecting?
@before_reconnect.call
@connection.periodically_reconnect(5)
end
end
@connection.on_tcp_connection_loss(&reconnect)
@connection.on_skipped_heartbeats(&reconnect)
setup_channel(options)
end
|
#connected? ⇒ Boolean
37
38
39
|
# File 'lib/sensu/transport/rabbitmq.rb', line 37
def connected?
@connection.connected?
end
|
#publish(exchange_type, exchange_name, message, options = {}, &callback) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/sensu/transport/rabbitmq.rb', line 45
def publish(exchange_type, exchange_name, message, options={}, &callback)
begin
@channel.method(exchange_type.to_sym).call(exchange_name, options).publish(message) do
info = {}
callback.call(info) if callback
end
rescue => error
info = {:error => error}
callback.call(info) if callback
end
end
|
#stats(queue_name, options = {}, &callback) ⇒ Object
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/sensu/transport/rabbitmq.rb', line 87
def stats(queue_name, options={}, &callback)
options = options.merge(:auto_delete => true)
@channel.queue(queue_name, options).status do |messages, consumers|
info = {
:messages => messages,
:consumers => consumers
}
callback.call(info)
end
end
|
#subscribe(exchange_type, exchange_name, queue_name = "", options = {}, &callback) ⇒ Object
57
58
59
60
61
62
63
64
65
|
# File 'lib/sensu/transport/rabbitmq.rb', line 57
def subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback)
previously_declared = @queues.has_key?(queue_name)
@queues[queue_name] ||= @channel.queue!(queue_name, :auto_delete => true)
queue = @queues[queue_name]
queue.bind(@channel.method(exchange_type.to_sym).call(exchange_name))
unless previously_declared
queue.subscribe(options, &callback)
end
end
|
#unsubscribe(&callback) ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/sensu/transport/rabbitmq.rb', line 67
def unsubscribe(&callback)
@queues.values.each do |queue|
if connected?
queue.unsubscribe
else
queue.before_recovery do
queue.unsubscribe
end
end
end
@queues = {}
@channel.recover if connected?
super
end
|