Class: Emque::Consuming::Adapters::RabbitMq::DelayedMessageWorker

Inherits:
Object
  • Object
show all
Includes:
Emque::Consuming::Actor, RetryableErrors
Defined in:
lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb

Instance Method Summary collapse

Methods included from RetryableErrors

#delay_ms_time, #retry_error, #retryable_error_limit, #retryable_errors

Methods included from Emque::Consuming::Actor

included

Constructor Details

#initialize(connection) ⇒ DelayedMessageWorker



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 18

def initialize(connection)
  self.channel = connection.create_channel

  if config.adapter.options[:prefetch]
    channel.prefetch(config.adapter.options[:prefetch])
  end

  self.delayed_message_exchange = channel.exchange(
    "emque.#{config.app_name}.delayed_message",
    {
      :type => "x-delayed-message",
      :durable => true,
      :auto_delete => false,
      :arguments => {
        "x-delayed-type" => "direct",
      }
    }
  )

  self.queue = channel.queue(
    "emque.#{config.app_name}.delayed_message",
    :durable => config.adapter.options[:durable],
    :auto_delete => config.adapter.options[:auto_delete],
    :arguments => {
      "x-dead-letter-exchange" => "#{config.app_name}.error"
    }
  ).bind(delayed_message_exchange)
end

Instance Method Details

#actor_died(actor, reason) ⇒ Object



12
13
14
15
16
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 12

def actor_died(actor, reason)
  unless shutdown
    logger.error "#{log_prefix} actor_died - died: #{reason}"
  end
end

#startObject



47
48
49
50
51
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 47

def start
  logger.info "#{log_prefix} starting..."
  queue.subscribe(:manual_ack => true, &method(:process_message))
  logger.debug "#{log_prefix} started"
end

#stopObject



53
54
55
56
57
58
59
60
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 53

def stop
  logger.debug "#{log_prefix} stopping..."
  super do
    logger.debug "#{log_prefix} closing channel"
    channel.close
  end
  logger.debug "#{log_prefix} stopped"
end