Class: Emque::Consuming::Adapters::RabbitMq::Worker

Inherits:
Object
  • Object
show all
Includes:
Emque::Consuming::Actor, RetryableErrors
Defined in:
lib/emque/consuming/adapters/rabbit_mq/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RetryableErrors

#delay_ms_time, #retry_error, #retryable_error_limit, #retryable_errors

Methods included from Emque::Consuming::Actor

included

Constructor Details

#initialize(connection, topic) ⇒ Worker

Returns a new instance of Worker.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 20

def initialize(connection, topic)
  self.topic = topic
  self.name = "#{self.topic} worker"
  self.shutdown = false

  # @note: channels are not thread safe, so is better to use
  #        a new channel in each worker.
  # https://github.com/jhbabon/amqp-celluloid/blob/master/lib/consumer.rb
  self.channel = connection.create_channel

  if config.adapter.options[:prefetch]
    channel.prefetch(config.adapter.options[:prefetch])
  end

  self.queue = channel
    .queue(
      "emque.#{config.app_name}.#{topic}",
      :durable => config.adapter.options[:durable],
      :auto_delete => config.adapter.options[:auto_delete],
      :arguments => {
        "x-dead-letter-exchange" => "#{config.app_name}.error"
      }
    )
    .bind(
      channel.fanout(
        topic.to_s, 
        :durable => true, 
        :auto_delete => false,
       )
    )
end

Instance Attribute Details

#topicObject

Returns the value of attribute topic.



12
13
14
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 12

def topic
  @topic
end

Instance Method Details

#actor_died(actor, reason) ⇒ Object



14
15
16
17
18
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 14

def actor_died(actor, reason)
  unless shutdown
    logger.error "#{log_prefix} actor_died - died: #{reason}"
  end
end

#startObject



52
53
54
55
56
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 52

def start
  logger.info "#{log_prefix} starting..."
  queue.subscribe(:manual_ack => true, &method(:process_message))
  logger.debug "#{log_prefix} started"
end

#stopObject



58
59
60
61
62
63
64
65
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 58

def stop
  logger.debug "#{log_prefix} stopping..."
  super do
    logger.debug "#{log_prefix} closing channel"
    channel.close
  end
  logger.debug "#{log_prefix} stopped"
end