Class: Phobos::Listener
- Inherits:
-
Object
- Object
- Phobos::Listener
- Includes:
- Instrumentation, Log
- Defined in:
- lib/phobos/listener.rb
Overview
rubocop:disable Metrics/ParameterLists, Metrics/ClassLength
Constant Summary collapse
- DEFAULT_MAX_BYTES_PER_PARTITION =
1 MB
1_048_576
- DELIVERY_OPTS =
%w[batch message inline_batch].freeze
Constants included from Instrumentation
Instance Attribute Summary collapse
-
#encoding ⇒ Object
readonly
Returns the value of attribute encoding.
-
#group_id ⇒ Object
readonly
Returns the value of attribute group_id.
-
#handler_class ⇒ Object
readonly
Returns the value of attribute handler_class.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
- #create_exponential_backoff ⇒ Object
-
#initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) ⇒ Listener
constructor
rubocop:disable Metrics/MethodLength.
- #send_heartbeat_if_necessary ⇒ Object
- #should_stop? ⇒ Boolean
-
#start ⇒ Object
rubocop:enable Metrics/MethodLength.
- #stop ⇒ Object
Methods included from Log
#log_debug, #log_error, #log_info
Methods included from Instrumentation
#instrument, subscribe, unsubscribe
Constructor Details
#initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) ⇒ Listener
rubocop:disable Metrics/MethodLength
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/phobos/listener.rb', line 16 def initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) @id = SecureRandom.hex[0...6] @handler_class = handler @group_id = group_id @topic = topic @backoff = backoff @delivery = delivery.to_s @subscribe_opts = { start_from_beginning: start_from_beginning, max_bytes_per_partition: max_bytes_per_partition } @kafka_consumer_opts = compact( session_timeout: session_timeout, offset_retention_time: offset_retention_time, offset_commit_interval: offset_commit_interval, heartbeat_interval: heartbeat_interval, offset_commit_threshold: offset_commit_threshold ) @encoding = Encoding.const_get(force_encoding.to_sym) if force_encoding @message_processing_opts = compact(min_bytes: min_bytes, max_wait_time: max_wait_time) @kafka_client = Phobos.create_kafka_client @producer_enabled = @handler_class.ancestors.include?(Phobos::Producer) end |
Instance Attribute Details
#encoding ⇒ Object (readonly)
Returns the value of attribute encoding.
13 14 15 |
# File 'lib/phobos/listener.rb', line 13 def encoding @encoding end |
#group_id ⇒ Object (readonly)
Returns the value of attribute group_id.
12 13 14 |
# File 'lib/phobos/listener.rb', line 12 def group_id @group_id end |
#handler_class ⇒ Object (readonly)
Returns the value of attribute handler_class.
13 14 15 |
# File 'lib/phobos/listener.rb', line 13 def handler_class @handler_class end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
12 13 14 |
# File 'lib/phobos/listener.rb', line 12 def id @id end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
12 13 14 |
# File 'lib/phobos/listener.rb', line 12 def topic @topic end |
Instance Method Details
#create_exponential_backoff ⇒ Object
73 74 75 |
# File 'lib/phobos/listener.rb', line 73 def create_exponential_backoff Phobos.create_exponential_backoff(@backoff) end |
#send_heartbeat_if_necessary ⇒ Object
81 82 83 84 85 |
# File 'lib/phobos/listener.rb', line 81 def send_heartbeat_if_necessary raise Phobos::AbortError if should_stop? @consumer&.send_heartbeat_if_necessary end |
#should_stop? ⇒ Boolean
77 78 79 |
# File 'lib/phobos/listener.rb', line 77 def should_stop? @signal_to_stop == true end |
#start ⇒ Object
rubocop:enable Metrics/MethodLength
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/phobos/listener.rb', line 43 def start @signal_to_stop = false start_listener begin start_consumer_loop rescue Kafka::ProcessingError, Phobos::AbortError # Abort is an exception to prevent the consumer from committing the offset. # Since "listener" had a message being retried while "stop" was called # it's wise to not commit the batch offset to avoid data loss. This will # cause some messages to be reprocessed instrument('listener.retry_aborted', ) do log_info('Retry loop aborted, listener is shutting down', ) end end ensure stop_listener end |
#stop ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/phobos/listener.rb', line 63 def stop return if should_stop? instrument('listener.stopping', ) do log_info('Listener stopping', ) @consumer&.stop @signal_to_stop = true end end |