Class: Qwrapper::Queues::RabbitMQ
- Inherits:
-
Base
- Object
- Base
- Qwrapper::Queues::RabbitMQ
show all
- Defined in:
- lib/qwrapper/queues/rabbitmq.rb
Instance Attribute Summary
Attributes inherited from Base
#config, #requeue_exceptions
Instance Method Summary
collapse
Methods inherited from Base
#initialize
Methods included from Loggable
#logger, #logger=
Instance Method Details
#publish(queue_name, messages, options = {}) ⇒ Object
36
37
38
39
40
41
42
43
|
# File 'lib/qwrapper/queues/rabbitmq.rb', line 36
def publish(queue_name, messages, options={})
messages = [messages] unless messages.is_a?(Array)
queue = get_queue(queue_name, options={})
[messages].each do |message|
queue.publish(message.to_s, :persistent => true)
end
connection.close if connection
end
|
#subscribe(queue_name, options = {}, &block) ⇒ Object
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/qwrapper/queues/rabbitmq.rb', line 10
def subscribe(queue_name, options={}, &block)
begin
logger.info "Subscribed to '#{queue_name}'"
queue = get_queue(queue_name, options)
queue.subscribe(ack: true, block: true) do |delivery_info, metadata, payload|
begin
if logger.respond_to?(:wrap)
logger_wrapped_block_execution(payload, &block)
else
block_execution(payload, &block)
end
rescue *requeue_exceptions => ex
logger.error "Requeue logic"
logger.error ex
end
queue.channel.ack(delivery_info.delivery_tag)
end
connection.close if connection
rescue Exception => ex
logger.error "TODO: What logic goes here?"
logger.error ex
end
end
|