Class: Larva::MessageReplayer

Inherits:
Object
  • Object
show all
Defined in:
lib/larva/message_replayer.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic_name) ⇒ MessageReplayer

Returns a new instance of MessageReplayer.



8
9
10
# File 'lib/larva/message_replayer.rb', line 8

def initialize(topic_name)
  @topic_name = topic_name
end

Class Method Details

.reprocess_failed(topic_name, count = 1) ⇒ Object



4
5
6
# File 'lib/larva/message_replayer.rb', line 4

def self.reprocess_failed(topic_name, count=1)
  new(topic_name).reprocess_failed(count)
end

Instance Method Details

#reprocess_failed(count) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/larva/message_replayer.rb', line 12

def reprocess_failed(count)
  Filum.logger.info "Reprocessing #{count} message(s) for topic: #{@topic_name}"

  subscription = Propono::QueueSubscription.create(@topic_name)
  original_url = subscription.queue.url
  failed_url = subscription.failed_queue.url

  sqs = Fog::AWS::SQS.new(Propono.aws_options)
  response = sqs.receive_message( failed_url, {'MaxNumberOfMessages' => count.to_i} )
  messages = response.body['Message']
  if messages.empty?
    raise StandardError.new "Message empty"
  else
    messages.each do |msg|
      sqs_message = Propono::SqsMessage.new(msg)
      Filum.logger.info "Message : #{sqs_message}"
      sqs.send_message(original_url, sqs_message.to_json_with_exception(StandardError.new "Fake Exception"))
      sqs.delete_message(failed_url, msg['ReceiptHandle'])
    end
  end
end