Class: Emque::Consuming::Adapters::RabbitMq::Worker
- Inherits:
-
Object
- Object
- Emque::Consuming::Adapters::RabbitMq::Worker
- Includes:
- Emque::Consuming::Actor, RetryableErrors
- Defined in:
- lib/emque/consuming/adapters/rabbit_mq/worker.rb
Instance Attribute Summary collapse
-
#topic ⇒ Object
Returns the value of attribute topic.
Instance Method Summary collapse
- #actor_died(actor, reason) ⇒ Object
-
#initialize(connection, topic) ⇒ Worker
constructor
A new instance of Worker.
- #start ⇒ Object
- #stop ⇒ Object
Methods included from RetryableErrors
#delay_ms_time, #retry_error, #retryable_error_limit, #retryable_errors
Methods included from Emque::Consuming::Actor
Constructor Details
#initialize(connection, topic) ⇒ Worker
Returns a new instance of Worker.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 20 def initialize(connection, topic) self.topic = topic self.name = "#{self.topic} worker" self.shutdown = false # @note: channels are not thread safe, so is better to use # a new channel in each worker. # https://github.com/jhbabon/amqp-celluloid/blob/master/lib/consumer.rb self.channel = connection.create_channel if config.adapter.[:prefetch] channel.prefetch(config.adapter.[:prefetch]) end self.queue = channel .queue( "emque.#{config.app_name}.#{topic}", :durable => config.adapter.[:durable], :auto_delete => config.adapter.[:auto_delete], :arguments => { "x-dead-letter-exchange" => "#{config.app_name}.error" } ) .bind( channel.fanout( topic.to_s, :durable => true, :auto_delete => false, ) ) end |
Instance Attribute Details
#topic ⇒ Object
Returns the value of attribute topic.
12 13 14 |
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 12 def topic @topic end |
Instance Method Details
#actor_died(actor, reason) ⇒ Object
14 15 16 17 18 |
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 14 def actor_died(actor, reason) unless shutdown logger.error "#{log_prefix} actor_died - died: #{reason}" end end |
#start ⇒ Object
52 53 54 55 56 |
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 52 def start logger.info "#{log_prefix} starting..." queue.subscribe(:manual_ack => true, &method(:process_message)) logger.debug "#{log_prefix} started" end |
#stop ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/emque/consuming/adapters/rabbit_mq/worker.rb', line 58 def stop logger.debug "#{log_prefix} stopping..." super do logger.debug "#{log_prefix} closing channel" channel.close end logger.debug "#{log_prefix} stopped" end |