Class: Deimos::Utils::SeekListener
- Inherits:
-
Phobos::Listener
- Object
- Phobos::Listener
- Deimos::Utils::SeekListener
- Defined in:
- lib/deimos/utils/inline_consumer.rb
Overview
Listener that can seek to get the last X messages in a topic.
Constant Summary collapse
- MAX_SEEK_RETRIES =
3
Instance Attribute Summary collapse
Instance Method Summary collapse
Instance Attribute Details
#num_messages ⇒ Integer
12 13 14 |
# File 'lib/deimos/utils/inline_consumer.rb', line 12 def @num_messages end |
Instance Method Details
#start_listener ⇒ void
This method returns an undefined value.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/deimos/utils/inline_consumer.rb', line 15 def start_listener @num_messages ||= 10 @consumer = create_kafka_consumer @consumer.subscribe(topic, @subscribe_opts) attempt = 0 begin attempt += 1 last_offset = @kafka_client.last_offset_for(topic, 0) offset = last_offset - if offset.positive? Deimos.config.logger.info("Seeking to #{offset}") @consumer.seek(topic, 0, offset) end rescue StandardError => e if attempt < MAX_SEEK_RETRIES sleep(1.seconds * attempt) retry end log_error("Could not seek to offset: #{e.} after #{MAX_SEEK_RETRIES} retries", ) end instrument('listener.start_handler', ) do @handler_class.start(@kafka_client) end log_info('Listener started', ) end |