Class: Phobos::Listener

Inherits:
Object
  • Object
show all
Includes:
Instrumentation, Log
Defined in:
lib/phobos/listener.rb

Overview

rubocop:disable Metrics/ParameterLists, Metrics/ClassLength

Constant Summary collapse

DEFAULT_MAX_BYTES_PER_PARTITION =

1 MB

1_048_576
DELIVERY_OPTS =
%w[batch message inline_batch].freeze

Constants included from Instrumentation

Instrumentation::NAMESPACE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Log

#log_debug, #log_error, #log_info

Methods included from Instrumentation

#instrument, subscribe, unsubscribe

Constructor Details

#initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) ⇒ Listener

rubocop:disable Metrics/MethodLength



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/phobos/listener.rb', line 16

def initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil,
               force_encoding: nil, start_from_beginning: true, backoff: nil,
               delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION,
               session_timeout: nil, offset_commit_interval: nil,
               heartbeat_interval: nil, offset_commit_threshold: nil,
               offset_retention_time: nil)
  @id = SecureRandom.hex[0...6]
  @handler_class = handler
  @group_id = group_id
  @topic = topic
  @backoff = backoff
  @delivery = delivery.to_s
  @subscribe_opts = {
    start_from_beginning: start_from_beginning, max_bytes_per_partition: max_bytes_per_partition
  }
  @kafka_consumer_opts = compact(
    session_timeout: session_timeout, offset_retention_time: offset_retention_time,
    offset_commit_interval: offset_commit_interval, heartbeat_interval: heartbeat_interval,
    offset_commit_threshold: offset_commit_threshold
  )
  @encoding = Encoding.const_get(force_encoding.to_sym) if force_encoding
  @message_processing_opts = compact(min_bytes: min_bytes, max_wait_time: max_wait_time)
  @kafka_client = Phobos.create_kafka_client
  @producer_enabled = @handler_class.ancestors.include?(Phobos::Producer)
end

Instance Attribute Details

#encodingObject (readonly)

Returns the value of attribute encoding.



13
14
15
# File 'lib/phobos/listener.rb', line 13

def encoding
  @encoding
end

#group_idObject (readonly)

Returns the value of attribute group_id.



12
13
14
# File 'lib/phobos/listener.rb', line 12

def group_id
  @group_id
end

#handler_classObject (readonly)

Returns the value of attribute handler_class.



13
14
15
# File 'lib/phobos/listener.rb', line 13

def handler_class
  @handler_class
end

#idObject (readonly)

Returns the value of attribute id.



12
13
14
# File 'lib/phobos/listener.rb', line 12

def id
  @id
end

#topicObject (readonly)

Returns the value of attribute topic.



12
13
14
# File 'lib/phobos/listener.rb', line 12

def topic
  @topic
end

Instance Method Details

#create_exponential_backoffObject



73
74
75
# File 'lib/phobos/listener.rb', line 73

def create_exponential_backoff
  Phobos.create_exponential_backoff(@backoff)
end

#send_heartbeat_if_necessaryObject

Raises:



81
82
83
84
85
# File 'lib/phobos/listener.rb', line 81

def send_heartbeat_if_necessary
  raise Phobos::AbortError if should_stop?

  @consumer&.send_heartbeat_if_necessary
end

#should_stop?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/phobos/listener.rb', line 77

def should_stop?
  @signal_to_stop == true
end

#startObject

rubocop:enable Metrics/MethodLength



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/phobos/listener.rb', line 43

def start
  @signal_to_stop = false

  start_listener

  begin
    start_consumer_loop
  rescue Kafka::ProcessingError, Phobos::AbortError
    # Abort is an exception to prevent the consumer from committing the offset.
    # Since "listener" had a message being retried while "stop" was called
    # it's wise to not commit the batch offset to avoid data loss. This will
    # cause some messages to be reprocessed
    instrument('listener.retry_aborted', ) do
      log_info('Retry loop aborted, listener is shutting down', )
    end
  end
ensure
  stop_listener
end

#stopObject



63
64
65
66
67
68
69
70
71
# File 'lib/phobos/listener.rb', line 63

def stop
  return if should_stop?

  instrument('listener.stopping', ) do
    log_info('Listener stopping', )
    @consumer&.stop
    @signal_to_stop = true
  end
end