Class: Deimos::Utils::SeekListener

Inherits:
Phobos::Listener
  • Object
show all
Defined in:
lib/deimos/utils/inline_consumer.rb

Overview

Listener that can seek to get the last X messages in a topic.

Constant Summary collapse

MAX_SEEK_RETRIES =

Returns:

  • (Integer)
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#num_messagesInteger

Returns:

  • (Integer)


12
13
14
# File 'lib/deimos/utils/inline_consumer.rb', line 12

def num_messages
  @num_messages
end

Instance Method Details

#start_listenervoid

This method returns an undefined value.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/deimos/utils/inline_consumer.rb', line 15

def start_listener
  @num_messages ||= 10
  @consumer = create_kafka_consumer
  @consumer.subscribe(topic, @subscribe_opts)
  attempt = 0

  begin
    attempt += 1
    last_offset = @kafka_client.last_offset_for(topic, 0)
    offset = last_offset - num_messages
    if offset.positive?
      Deimos.config.logger.info("Seeking to #{offset}")
      @consumer.seek(topic, 0, offset)
    end
  rescue StandardError => e
    if attempt < MAX_SEEK_RETRIES
      sleep(1.seconds * attempt)
      retry
    end
    log_error("Could not seek to offset: #{e.message} after #{MAX_SEEK_RETRIES} retries", )
  end

  instrument('listener.start_handler', ) do
    @handler_class.start(@kafka_client)
  end
  log_info('Listener started', )
end