Class: Emque::Consuming::Adapters::RabbitMq::ErrorWorker

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/emque/consuming/adapters/rabbit_mq/error_worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ ErrorWorker

Returns a new instance of ErrorWorker.



10
11
12
13
# File 'lib/emque/consuming/adapters/rabbit_mq/error_worker.rb', line 10

def initialize(connection)
  self.connection = connection
  self.channel = connection.create_channel
end

Instance Method Details

#retry_errorsObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/emque/consuming/adapters/rabbit_mq/error_worker.rb', line 15

def retry_errors
  logger.info "#{log_prefix} starting"
  channel.open if channel.closed?
  [error_queue.message_count, 100].min.times do
    delivery_info, properties, payload = error_queue.pop(
      {:manual_ack => true}
    )
    if delivery_info && properties && payload
      retry_message(delivery_info, properties, payload)
    end
  end
  channel.close
  logger.info "#{log_prefix} done"
end