Class: Emque::Consuming::Adapters::RabbitMq::DelayedMessageWorker

Inherits:
Object
  • Object
show all
Includes:
Emque::Consuming::Actor, RetryableErrors
Defined in:
lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb

Instance Method Summary collapse

Methods included from RetryableErrors

#delay_ms_time, #retry_error, #retryable_error_limit, #retryable_errors

Methods included from Emque::Consuming::Actor

included

Constructor Details

#initialize(connection) ⇒ DelayedMessageWorker

Returns a new instance of DelayedMessageWorker.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 18

def initialize(connection)
  self.channel = connection.create_channel

  if config.adapter.options[:prefetch]
    channel.prefetch(config.adapter.options[:prefetch])
  end

  self.delayed_message_exchange = channel.exchange(
    "emque.#{config.app_name}.delayed_message",
    {
      :type => "x-delayed-message",
      :durable => true,
      :auto_delete => false,
      :arguments => {
        "x-delayed-type" => "direct",
      }
    }
  )

  self.queue = channel.queue(
    "emque.#{config.app_name}.delayed_message",
    :durable => config.adapter.options[:durable],
    :auto_delete => config.adapter.options[:auto_delete],
    :arguments => {
      "x-dead-letter-exchange" => "#{config.app_name}.error"
    }
  ).bind(delayed_message_exchange)
end

Instance Method Details

#actor_died(actor, reason) ⇒ Object



12
13
14
15
16
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 12

def actor_died(actor, reason)
  unless shutdown
    logger.error "#{log_prefix} actor_died - died: #{reason}"
  end
end

#startObject



47
48
49
50
51
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 47

def start
  logger.info "#{log_prefix} starting..."
  queue.subscribe(:manual_ack => true, &method(:process_message))
  logger.debug "#{log_prefix} started"
end

#stopObject



53
54
55
56
57
58
59
60
# File 'lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb', line 53

def stop
  logger.debug "#{log_prefix} stopping..."
  super do
    logger.debug "#{log_prefix} closing channel"
    channel.close
  end
  logger.debug "#{log_prefix} stopped"
end