Class: Emque::Consuming::Adapters::RabbitMq::DelayedMessageWorker
Instance Method Summary
collapse
#delay_ms_time, #retry_error, #retryable_error_limit, #retryable_errors
included
Constructor Details
Returns a new instance of DelayedMessageWorker.
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 18
def initialize(connection)
self.channel = connection.create_channel
if config.adapter.options[:prefetch]
channel.prefetch(config.adapter.options[:prefetch])
end
self.delayed_message_exchange = channel.exchange(
"emque.#{config.app_name}.delayed_message",
{
:type => "x-delayed-message",
:durable => true,
:auto_delete => false,
:arguments => {
"x-delayed-type" => "direct",
}
}
)
self.queue = channel.queue(
"emque.#{config.app_name}.delayed_message",
:durable => config.adapter.options[:durable],
:auto_delete => config.adapter.options[:auto_delete],
:arguments => {
"x-dead-letter-exchange" => "#{config.app_name}.error"
}
).bind(delayed_message_exchange)
end
|
Instance Method Details
#actor_died(actor, reason) ⇒ Object
12
13
14
15
16
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 12
def actor_died(actor, reason)
unless shutdown
logger.error "#{log_prefix} actor_died - died: #{reason}"
end
end
|
#start ⇒ Object
47
48
49
50
51
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 47
def start
logger.info "#{log_prefix} starting..."
queue.subscribe(:manual_ack => true, &method(:process_message))
logger.debug "#{log_prefix} started"
end
|
#stop ⇒ Object
53
54
55
56
57
58
59
60
|
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 53
def stop
logger.debug "#{log_prefix} stopping..."
super do
logger.debug "#{log_prefix} closing channel"
channel.close
end
logger.debug "#{log_prefix} stopped"
end
|