Class: Phobos::Listener

Inherits:
Object
  • Object
show all
Includes:
Instrumentation, Log
Defined in:
lib/phobos/listener.rb

Overview

rubocop:disable Metrics/ParameterLists, Metrics/ClassLength

Constant Summary collapse

DEFAULT_MAX_BYTES_PER_PARTITION =

1 MB

1_048_576
DELIVERY_OPTS =
%w[batch message inline_batch].freeze

Constants included from Instrumentation

Instrumentation::NAMESPACE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Log

#log_debug, #log_error, #log_info, #log_warn

Methods included from Instrumentation

#instrument, subscribe, unsubscribe

Constructor Details

#initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) ⇒ Listener

rubocop:disable Metrics/MethodLength

Parameters:

  • handler (Class)
  • group_id (String)
  • topic (String)
  • min_bytes (Integer) (defaults to: nil)
  • max_wait_time (Integer) (defaults to: nil)
  • start_from_beginning (Boolean) (defaults to: true)
  • delivery (String) (defaults to: 'batch')
  • max_bytes_per_partition (Integer) (defaults to: DEFAULT_MAX_BYTES_PER_PARTITION)
  • session_timeout (Integer) (defaults to: nil)
  • offset_commit_interval (Integer) (defaults to: nil)
  • heartbeat_interval (Integer) (defaults to: nil)
  • offset_commit_threshold (Integer) (defaults to: nil)
  • offset_retention_time (Integer) (defaults to: nil)


35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/phobos/listener.rb', line 35

def initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil,
               force_encoding: nil, start_from_beginning: true, backoff: nil,
               delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION,
               session_timeout: nil, offset_commit_interval: nil,
               heartbeat_interval: nil, offset_commit_threshold: nil,
               offset_retention_time: nil)
  @id = SecureRandom.hex[0...6]
  @handler_class = handler
  @group_id = group_id
  @topic = topic
  @backoff = backoff
  @delivery = delivery.to_s
  @subscribe_opts = {
    start_from_beginning: start_from_beginning, max_bytes_per_partition: max_bytes_per_partition
  }
  @kafka_consumer_opts = compact(
    session_timeout: session_timeout, offset_retention_time: offset_retention_time,
    offset_commit_interval: offset_commit_interval, heartbeat_interval: heartbeat_interval,
    offset_commit_threshold: offset_commit_threshold
  )
  @encoding = Encoding.const_get(force_encoding.to_sym) if force_encoding
  @message_processing_opts = compact(min_bytes: min_bytes, max_wait_time: max_wait_time)
  @kafka_client = Phobos.create_kafka_client(:consumer)
  @producer_enabled = @handler_class.ancestors.include?(Phobos::Producer)
end

Instance Attribute Details

#consumerObject (readonly)

Returns the value of attribute consumer.



19
20
21
# File 'lib/phobos/listener.rb', line 19

def consumer
  @consumer
end

#encodingObject (readonly)

Returns the value of attribute encoding.



19
20
21
# File 'lib/phobos/listener.rb', line 19

def encoding
  @encoding
end

#group_idString (readonly)

Returns:

  • (String)


13
14
15
# File 'lib/phobos/listener.rb', line 13

def group_id
  @group_id
end

#handler_classClass (readonly)

Returns:

  • (Class)


18
19
20
# File 'lib/phobos/listener.rb', line 18

def handler_class
  @handler_class
end

#idObject (readonly)

Returns the value of attribute id.



16
17
18
# File 'lib/phobos/listener.rb', line 16

def id
  @id
end

#topicString (readonly)

Returns:

  • (String)


15
16
17
# File 'lib/phobos/listener.rb', line 15

def topic
  @topic
end

Instance Method Details

#create_exponential_backoffObject



94
95
96
# File 'lib/phobos/listener.rb', line 94

def create_exponential_backoff
  Phobos.create_exponential_backoff(@backoff)
end

#send_heartbeat_if_necessaryObject

Raises:



102
103
104
105
106
# File 'lib/phobos/listener.rb', line 102

def send_heartbeat_if_necessary
  raise Phobos::AbortError if should_stop?

  @consumer&.send_heartbeat_if_necessary
end

#should_stop?Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/phobos/listener.rb', line 98

def should_stop?
  @signal_to_stop == true
end

#startvoid

This method returns an undefined value.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/phobos/listener.rb', line 63

def start
  @signal_to_stop = false
  start_listener

  begin
    start_consumer_loop
  rescue Kafka::ProcessingError, Phobos::AbortError => e
    # Abort is an exception to prevent the consumer from committing the offset.
    # Since "listener" had a message being retried while "stop" was called
    # it's wise to not commit the batch offset to avoid data loss. This will
    # cause some messages to be reprocessed
    instrument('listener.retry_aborted', ) do
      log_info('Retry loop aborted, listener is shutting down', )
    end
    raise e if e.is_a?(Kafka::ProcessingError)
  end
ensure
  stop_listener
end

#stopvoid

This method returns an undefined value.



84
85
86
87
88
89
90
91
92
# File 'lib/phobos/listener.rb', line 84

def stop
  return if should_stop?

  instrument('listener.stopping', ) do
    log_info('Listener stopping', )
    @consumer&.stop
    @signal_to_stop = true
  end
end