21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/emque/producing/publisher/rabbitmq.rb', line 21
def publish(topic, message_type, message, key = nil, raise_on_failure)
ch = get_channel(raise_on_failure)
ch.open if ch.closed?
begin
exchange = ch.fanout(topic, :durable => true, :auto_delete => false)
ch.confirm_select if raise_on_failure
sent = true
exchange.on_return do |return_info, properties, content|
Emque::Producing.logger.warn("App [#{properties[:app_id]}] message was returned from exchange [#{return_info[:exchange]}]")
sent = false
end
exchange.publish(
message,
:mandatory => true,
:persistent => true,
:type => message_type,
:app_id => Emque::Producing.configuration.app_name,
:content_type => "application/json"
)
if raise_on_failure
success = ch.wait_for_confirms
unless success
Emque::Producing.logger.warn("RabbitMQ Publisher: message was nacked")
ch.nacked_set.each do |n|
Emque::Producing.logger.warn("message id: #{n}")
end
end
end
return sent
ensure
if raise_on_failure
CONFIRM_CHANNEL_POOL << ch unless ch.nil?
else
CHANNEL_POOL << ch unless ch.nil?
end
end
end
|