Class: Messaging::Kafka::MessageReceiver
- Inherits:
- 
      Messages::MessageReceiver
      
        - Object
- Messages::MessageReceiver
- Messaging::Kafka::MessageReceiver
 
- Includes:
- Phobos::Handler, Phobos::Producer
- Defined in:
- lib/messaging/kafka/message_receiver.rb
Constant Summary collapse
- ERROR_THRESHOLD =
- 5
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.logger ⇒ Object
| 37 38 39 | # File 'lib/messaging/kafka/message_receiver.rb', line 37 def self.logger @logger ||= Logger.new(STDOUT) end | 
Instance Method Details
#consume(payload, metadata) ⇒ Object
| 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # File 'lib/messaging/kafka/message_receiver.rb', line 12 def consume(payload, ) begin # metadata comes through as a hash, no need to parse payload_hash = JSON.parse(payload).with_indifferent_access Messaging::Kafka::MessageLogger.(payload_hash, ) process(payload_hash, ) Messaging::Kafka::MessageLogger.(payload_hash, ) rescue JSON::ParserError => e # A json parse error cannot be recovered from. Log the error and the body. self.class.logger.error("MessageReceiver JSON::ParserError. payload: #{payload}, error: #{e}") rescue => e if [:retry_count] > ERROR_THRESHOLD # move the message to the poison topic if the threshold is reached. = Messaging::Messages::Events::MessagePoisoned.new(payload: payload_hash, listener: [:handler], error_message: e.) Messaging::Kafka::MessageSender.async_publish() else raise end end end | 
#process(payload, metadata) ⇒ Object
| 33 34 35 | # File 'lib/messaging/kafka/message_receiver.rb', line 33 def process(payload, ) raise NotImplementedError end |