Class: Karafka::BaseConsumer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Taggable
Defined in:
lib/karafka/base_consumer.rb

Overview

Base consumer from which all Karafka consumers should inherit

Direct Known Subclasses

ActiveJob::Consumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBaseConsumer

Creates new consumer and assigns it an id



26
27
28
29
# File 'lib/karafka/base_consumer.rb', line 26

def initialize
  @id = SecureRandom.hex(6)
  @used = false
end

Instance Attribute Details

#clientKarafka::Connection::Client

Returns kafka connection client.

Returns:



19
20
21
# File 'lib/karafka/base_consumer.rb', line 19

def client
  @client
end

#coordinatorKarafka::Processing::Coordinator

Returns coordinator.

Returns:



21
22
23
# File 'lib/karafka/base_consumer.rb', line 21

def coordinator
  @coordinator
end

#idString (readonly)

Returns id of the current consumer.

Returns:

  • (String)

    id of the current consumer



15
16
17
# File 'lib/karafka/base_consumer.rb', line 15

def id
  @id
end

#messagesKarafka::Routing::Topic

Returns topic to which a given consumer is subscribed.

Returns:



17
18
19
# File 'lib/karafka/base_consumer.rb', line 17

def messages
  @messages
end

#producerWaterdrop::Producer

Returns producer instance.

Returns:

  • (Waterdrop::Producer)

    producer instance



23
24
25
# File 'lib/karafka/base_consumer.rb', line 23

def producer
  @producer
end

Instance Method Details

#on_after_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.



94
95
96
97
98
99
100
101
102
103
# File 'lib/karafka/base_consumer.rb', line 94

def on_after_consume
  handle_after_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.after_consume.error'
  )
end

#on_before_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff

Can be used to run preparation code in the worker



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/karafka/base_consumer.rb', line 55

def on_before_consume
  messages..processed_at = Time.now
  messages..freeze

  # We run this after the full metadata setup, so we can use all the messages information
  # if needed
  handle_before_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.before_consume.error'
  )
end

#on_before_enqueueObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.

Can be used to run preparation code prior to the job being enqueued



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/base_consumer.rb', line 37

def on_before_enqueue
  @used = true
  handle_before_enqueue
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.before_enqueue.error'
  )
end

#on_consumeBoolean

Note:

We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.

Executes the default consumer flow.

Returns:

  • (Boolean)

    true if there was no exception, otherwise false.



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/karafka/base_consumer.rb', line 79

def on_consume
  handle_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.consume.error'
  )
end

#on_idleObject

Trigger method for running on idle runs without messages



108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/base_consumer.rb', line 108

def on_idle
  handle_idle
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.idle.error'
  )
end

#on_revokedObject

Trigger method for running on partition revocation.



122
123
124
125
126
127
128
129
130
131
# File 'lib/karafka/base_consumer.rb', line 122

def on_revoked
  handle_revoked
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.revoked.error'
  )
end

#on_shutdownObject

Trigger method for running on shutdown.



136
137
138
139
140
141
142
143
144
145
# File 'lib/karafka/base_consumer.rb', line 136

def on_shutdown
  handle_shutdown
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.shutdown.error'
  )
end