Class: Messaging::Kafka::MessageReceiver

Inherits:
Messages::MessageReceiver show all
Includes:
Phobos::Handler, Phobos::Producer
Defined in:
lib/messaging/kafka/message_receiver.rb

Constant Summary collapse

ERROR_THRESHOLD =
5

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.loggerObject



37
38
39
# File 'lib/messaging/kafka/message_receiver.rb', line 37

def self.logger
  @logger ||= Logger.new(STDOUT)
end

Instance Method Details

#consume(payload, metadata) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/messaging/kafka/message_receiver.rb', line 12

def consume(payload, )
  begin
    # metadata comes through as a hash, no need to parse
    payload_hash =  JSON.parse(payload).with_indifferent_access
    Messaging::Kafka::MessageLogger.message_received(payload_hash, )
    process(payload_hash, )
    Messaging::Kafka::MessageLogger.message_processed(payload_hash, )
  rescue JSON::ParserError => e
    # A json parse error cannot be recovered from. Log the error and the body.
    self.class.logger.error("MessageReceiver JSON::ParserError. payload: #{payload}, error: #{e}")
  rescue => e
    if [:retry_count] > ERROR_THRESHOLD
      # move the message to the poison topic if the threshold is reached.
      message = Messaging::Messages::Events::MessagePoisoned.new(payload: payload_hash, listener: [:handler], error_message: e.message)
      Messaging::Kafka::MessageSender.async_publish(message)
    else
      raise
    end
  end
end

#process(payload, metadata) ⇒ Object

Raises:

  • (NotImplementedError)


33
34
35
# File 'lib/messaging/kafka/message_receiver.rb', line 33

def process(payload, )
  raise NotImplementedError
end