Class: Sensu::Transport::RabbitMQ
- Inherits:
-
Base
- Object
- Base
- Sensu::Transport::RabbitMQ
show all
- Defined in:
- lib/sensu/transport/rabbitmq.rb
Instance Attribute Summary
Attributes inherited from Base
#logger
Instance Method Summary
collapse
-
#acknowledge(info, &callback) ⇒ Object
-
#close ⇒ Object
-
#connect(options = {}) ⇒ Object
-
#connected? ⇒ Boolean
-
#publish(exchange_type, exchange_name, message, options = {}, &callback) ⇒ Object
-
#reconnect ⇒ Object
-
#stats(queue_name, options = {}, &callback) ⇒ Object
-
#subscribe(exchange_type, exchange_name, queue_name = "", options = {}, &callback) ⇒ Object
-
#unsubscribe(&callback) ⇒ Object
Methods inherited from Base
#ack, #after_reconnect, #before_reconnect, descendants, #initialize, #on_error
Instance Method Details
#acknowledge(info, &callback) ⇒ Object
81
82
83
84
|
# File 'lib/sensu/transport/rabbitmq.rb', line 81
def acknowledge(info, &callback)
info.ack
callback.call(info) if callback
end
|
#close ⇒ Object
39
40
41
42
|
# File 'lib/sensu/transport/rabbitmq.rb', line 39
def close
callback = Proc.new { @connection.close }
connected? ? callback.call : EM.next_tick(callback)
end
|
#connect(options = {}) ⇒ Object
10
11
12
13
14
15
|
# File 'lib/sensu/transport/rabbitmq.rb', line 10
def connect(options={})
reset
set_connection_options(options)
create_connection_timeout
connect_with_eligible_options
end
|
#connected? ⇒ Boolean
35
36
37
|
# File 'lib/sensu/transport/rabbitmq.rb', line 35
def connected?
@connection.connected?
end
|
#publish(exchange_type, exchange_name, message, options = {}, &callback) ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/sensu/transport/rabbitmq.rb', line 44
def publish(exchange_type, exchange_name, message, options={}, &callback)
begin
@channel.method(exchange_type.to_sym).call(exchange_name, options).publish(message) do
info = {}
callback.call(info) if callback
end
rescue => error
info = {:error => error}
callback.call(info) if callback
end
end
|
#reconnect ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/sensu/transport/rabbitmq.rb', line 17
def reconnect
unless @reconnecting
@reconnecting = true
@before_reconnect.call
reset
timer = EM::PeriodicTimer.new(5) do
unless connected?
connect_with_eligible_options do
@reconnecting = false
@after_reconnect.call
end
else
timer.cancel
end
end
end
end
|
#stats(queue_name, options = {}, &callback) ⇒ Object
86
87
88
89
90
91
92
93
94
95
|
# File 'lib/sensu/transport/rabbitmq.rb', line 86
def stats(queue_name, options={}, &callback)
options = options.merge(:auto_delete => true)
@channel.queue(queue_name, options).status do |messages, consumers|
info = {
:messages => messages,
:consumers => consumers
}
callback.call(info)
end
end
|
#subscribe(exchange_type, exchange_name, queue_name = "", options = {}, &callback) ⇒ Object
56
57
58
59
60
61
62
63
64
|
# File 'lib/sensu/transport/rabbitmq.rb', line 56
def subscribe(exchange_type, exchange_name, queue_name="", options={}, &callback)
previously_declared = @queues.has_key?(queue_name)
@queues[queue_name] ||= @channel.queue!(queue_name, :auto_delete => true)
queue = @queues[queue_name]
queue.bind(@channel.method(exchange_type.to_sym).call(exchange_name))
unless previously_declared
queue.subscribe(options, &callback)
end
end
|
#unsubscribe(&callback) ⇒ Object
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/sensu/transport/rabbitmq.rb', line 66
def unsubscribe(&callback)
@queues.values.each do |queue|
if connected?
queue.unsubscribe
else
queue.before_recovery do
queue.unsubscribe
end
end
end
@queues = {}
@channel.recover if connected?
super
end
|