Class: Phobos::Listener
- Inherits:
-
Object
- Object
- Phobos::Listener
- Includes:
- Instrumentation, Log
- Defined in:
- lib/phobos/listener.rb
Overview
rubocop:disable Metrics/ParameterLists, Metrics/ClassLength
Constant Summary collapse
- DEFAULT_MAX_BYTES_PER_PARTITION =
1 MB
1_048_576
- DELIVERY_OPTS =
%w[batch message inline_batch].freeze
Constants included from Instrumentation
Instance Attribute Summary collapse
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#encoding ⇒ Object
readonly
Returns the value of attribute encoding.
- #group_id ⇒ String readonly
- #handler_class ⇒ Class readonly
-
#id ⇒ Object
readonly
Returns the value of attribute id.
- #topic ⇒ String readonly
Instance Method Summary collapse
- #create_exponential_backoff ⇒ Object
-
#initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) ⇒ Listener
constructor
rubocop:disable Metrics/MethodLength.
- #send_heartbeat_if_necessary ⇒ Object
- #should_stop? ⇒ Boolean
- #start ⇒ void
- #stop ⇒ void
Methods included from Log
#log_debug, #log_error, #log_info, #log_warn
Methods included from Instrumentation
#instrument, subscribe, unsubscribe
Constructor Details
#initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) ⇒ Listener
rubocop:disable Metrics/MethodLength
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/phobos/listener.rb', line 35 def initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) @id = SecureRandom.hex[0...6] @handler_class = handler @group_id = group_id @topic = topic @backoff = backoff @delivery = delivery.to_s @subscribe_opts = { start_from_beginning: start_from_beginning, max_bytes_per_partition: max_bytes_per_partition } @kafka_consumer_opts = compact( session_timeout: session_timeout, offset_retention_time: offset_retention_time, offset_commit_interval: offset_commit_interval, heartbeat_interval: heartbeat_interval, offset_commit_threshold: offset_commit_threshold ) @encoding = Encoding.const_get(force_encoding.to_sym) if force_encoding @message_processing_opts = compact(min_bytes: min_bytes, max_wait_time: max_wait_time) @kafka_client = Phobos.create_kafka_client(:consumer) @producer_enabled = @handler_class.ancestors.include?(Phobos::Producer) end |
Instance Attribute Details
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
19 20 21 |
# File 'lib/phobos/listener.rb', line 19 def consumer @consumer end |
#encoding ⇒ Object (readonly)
Returns the value of attribute encoding.
19 20 21 |
# File 'lib/phobos/listener.rb', line 19 def encoding @encoding end |
#group_id ⇒ String (readonly)
13 14 15 |
# File 'lib/phobos/listener.rb', line 13 def group_id @group_id end |
#handler_class ⇒ Class (readonly)
18 19 20 |
# File 'lib/phobos/listener.rb', line 18 def handler_class @handler_class end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
16 17 18 |
# File 'lib/phobos/listener.rb', line 16 def id @id end |
#topic ⇒ String (readonly)
15 16 17 |
# File 'lib/phobos/listener.rb', line 15 def topic @topic end |
Instance Method Details
#create_exponential_backoff ⇒ Object
94 95 96 |
# File 'lib/phobos/listener.rb', line 94 def create_exponential_backoff Phobos.create_exponential_backoff(@backoff) end |
#send_heartbeat_if_necessary ⇒ Object
102 103 104 105 106 |
# File 'lib/phobos/listener.rb', line 102 def send_heartbeat_if_necessary raise Phobos::AbortError if should_stop? @consumer&.send_heartbeat_if_necessary end |
#should_stop? ⇒ Boolean
98 99 100 |
# File 'lib/phobos/listener.rb', line 98 def should_stop? @signal_to_stop == true end |
#start ⇒ void
This method returns an undefined value.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/phobos/listener.rb', line 63 def start @signal_to_stop = false start_listener begin start_consumer_loop rescue Kafka::ProcessingError, Phobos::AbortError => e # Abort is an exception to prevent the consumer from committing the offset. # Since "listener" had a message being retried while "stop" was called # it's wise to not commit the batch offset to avoid data loss. This will # cause some messages to be reprocessed instrument('listener.retry_aborted', ) do log_info('Retry loop aborted, listener is shutting down', ) end raise e if e.is_a?(Kafka::ProcessingError) end ensure stop_listener end |
#stop ⇒ void
This method returns an undefined value.
84 85 86 87 88 89 90 91 92 |
# File 'lib/phobos/listener.rb', line 84 def stop return if should_stop? instrument('listener.stopping', ) do log_info('Listener stopping', ) @consumer&.stop @signal_to_stop = true end end |