Class: Karafka::BaseConsumer
- Inherits:
-
Object
- Object
- Karafka::BaseConsumer
- Extended by:
- Forwardable
- Includes:
- Core::Taggable
- Defined in:
- lib/karafka/base_consumer.rb
Overview
Base consumer from which all Karafka consumers should inherit
Direct Known Subclasses
Instance Attribute Summary collapse
-
#client ⇒ Karafka::Connection::Client
Kafka connection client.
-
#coordinator ⇒ Karafka::Processing::Coordinator
Coordinator.
-
#id ⇒ String
readonly
Id of the current consumer.
-
#messages ⇒ Karafka::Routing::Topic
Topic to which a given consumer is subscribed.
-
#producer ⇒ Waterdrop::Producer
Producer instance.
Instance Method Summary collapse
-
#initialize ⇒ BaseConsumer
constructor
Creates new consumer and assigns it an id.
- #on_after_consume ⇒ Object
-
#on_before_consume ⇒ Object
Can be used to run preparation code in the worker.
-
#on_before_enqueue ⇒ Object
Can be used to run preparation code prior to the job being enqueued.
-
#on_consume ⇒ Boolean
Executes the default consumer flow.
-
#on_idle ⇒ Object
Trigger method for running on idle runs without messages.
-
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
-
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
Constructor Details
#initialize ⇒ BaseConsumer
Creates new consumer and assigns it an id
26 27 28 29 |
# File 'lib/karafka/base_consumer.rb', line 26 def initialize @id = SecureRandom.hex(6) @used = false end |
Instance Attribute Details
#client ⇒ Karafka::Connection::Client
Returns kafka connection client.
19 20 21 |
# File 'lib/karafka/base_consumer.rb', line 19 def client @client end |
#coordinator ⇒ Karafka::Processing::Coordinator
Returns coordinator.
21 22 23 |
# File 'lib/karafka/base_consumer.rb', line 21 def coordinator @coordinator end |
#id ⇒ String (readonly)
Returns id of the current consumer.
15 16 17 |
# File 'lib/karafka/base_consumer.rb', line 15 def id @id end |
#messages ⇒ Karafka::Routing::Topic
Returns topic to which a given consumer is subscribed.
17 18 19 |
# File 'lib/karafka/base_consumer.rb', line 17 def @messages end |
#producer ⇒ Waterdrop::Producer
Returns producer instance.
23 24 25 |
# File 'lib/karafka/base_consumer.rb', line 23 def producer @producer end |
Instance Method Details
#on_after_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/karafka/base_consumer.rb', line 94 def on_after_consume handle_after_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.after_consume.error' ) end |
#on_before_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff
Can be used to run preparation code in the worker
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/karafka/base_consumer.rb', line 55 def on_before_consume ..processed_at = Time.now ..freeze # We run this after the full metadata setup, so we can use all the messages information # if needed handle_before_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.before_consume.error' ) end |
#on_before_enqueue ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.
Can be used to run preparation code prior to the job being enqueued
37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/karafka/base_consumer.rb', line 37 def on_before_enqueue @used = true handle_before_enqueue rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.before_enqueue.error' ) end |
#on_consume ⇒ Boolean
We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.
Executes the default consumer flow.
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/karafka/base_consumer.rb', line 79 def on_consume handle_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.consume.error' ) end |
#on_idle ⇒ Object
Trigger method for running on idle runs without messages
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/base_consumer.rb', line 108 def on_idle handle_idle rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.idle.error' ) end |
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
122 123 124 125 126 127 128 129 130 131 |
# File 'lib/karafka/base_consumer.rb', line 122 def on_revoked handle_revoked rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.revoked.error' ) end |
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/karafka/base_consumer.rb', line 136 def on_shutdown handle_shutdown rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.shutdown.error' ) end |