Class: Rdkafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/rdkafka/consumer.rb,
lib/rdkafka/consumer/message.rb,
lib/rdkafka/consumer/partition.rb,
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka brokers automatically assign partitions and load balance partitions over consumers that have the same :"group.id" set in their configuration.

To create a consumer set up a Config and call consumer on that. It is mandatory to set :"group.id" in the configuration.

Defined Under Namespace

Classes: Message, Partition, TopicPartitionList

Instance Method Summary collapse

Instance Method Details

#assign(list) ⇒ Object

Atomic assignment of partitions to consume

Parameters:

Raises:


83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/rdkafka/consumer.rb', line 83

def assign(list)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end
  tpl = list.to_native_tpl
  response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end

#assignmentTopicPartitionList

Returns the current partition assignment.

Returns:

Raises:


101
102
103
104
105
106
107
108
109
# File 'lib/rdkafka/consumer.rb', line 101

def assignment
  tpl = FFI::MemoryPointer.new(:pointer)
  tpl.autorelease = false
  response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
  Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl.get_pointer(0))
end

#closenil

Close this consumer

Returns:

  • (nil)

19
20
21
22
# File 'lib/rdkafka/consumer.rb', line 19

def close
  @closing = true
  Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
end

#commit(list = nil, async = false) ⇒ nil

Commit the current offsets of this consumer

Parameters:

  • async (Boolean) (defaults to: false)

    Whether to commit async or wait for the commit to finish

Returns:

  • (nil)

Raises:


231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/rdkafka/consumer.rb', line 231

def commit(list=nil, async=false)
  if !list.nil? && !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end
  tpl = if list
          list.to_native_tpl
        else
          nil
        end
  response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl) if tpl
end

#committed(list = nil, timeout_ms = 200) ⇒ TopicPartitionList

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

  • timeout_ms (Integer) (defaults to: 200)

    The timeout for fetching this information.

Returns:

Raises:

  • (RdkafkaError)

    When getting the committed positions fails.


120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/rdkafka/consumer.rb', line 120

def committed(list=nil, timeout_ms=200)
  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end
  tpl = list.to_native_tpl
  response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
  TopicPartitionList.from_native_tpl(tpl)
end

#each {|message| ... } ⇒ nil

Poll for new messages and yield for each received one. Iteration will end when the consumer is closed.

Yield Parameters:

  • message (Message)

    Received message

Returns:

  • (nil)

Raises:


284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/rdkafka/consumer.rb', line 284

def each(&block)
  loop do
    message = poll(250)
    if message
      block.call(message)
    else
      if @closing
        break
      else
        next
      end
    end
  end
end

#lag(topic_partition_list, watermark_timeout_ms = 100) ⇒ Hash<String, Hash<Integer, Integer>>

Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.

Parameters:

  • topic_partition_list (TopicPartitionList)

    The list to calculate lag for.

  • watermark_timeout_ms (Integer) (defaults to: 100)

    The timeout for each query watermark call.

Returns:

  • (Hash<String, Hash<Integer, Integer>>)

    A hash containing all topics with the lag per partition

Raises:


173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/rdkafka/consumer.rb', line 173

def lag(topic_partition_list, watermark_timeout_ms=100)
  out = {}
  topic_partition_list.to_h.each do |topic, partitions|
    # Query high watermarks for this topic's partitions
    # and compare to the offset in the list.
    topic_out = {}
    partitions.each do |p|
      next if p.offset.nil?
      low, high = query_watermark_offsets(
        topic,
        p.partition,
        watermark_timeout_ms
      )
      topic_out[p.partition] = high - p.offset
    end
    out[topic] = topic_out
  end
  out
end

#poll(timeout_ms) ⇒ Message?

Poll for the next message on one of the subscribed topics

Parameters:

  • timeout_ms (Integer)

    Timeout of this poll

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:


255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/rdkafka/consumer.rb', line 255

def poll(timeout_ms)
  message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != 0
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if !message_ptr.nil? && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end

#query_watermark_offsets(topic, partition, timeout_ms = 200) ⇒ Integer

Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.

Parameters:

  • topic (String)

    The topic to query

  • partition (Integer)

    The partition to query

  • timeout_ms (Integer) (defaults to: 200)

    The timeout for querying the broker

Returns:

  • (Integer)

    The low and high watermark

Raises:


143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/rdkafka/consumer.rb', line 143

def query_watermark_offsets(topic, partition, timeout_ms=200)
  low = FFI::MemoryPointer.new(:int64, 1)
  high = FFI::MemoryPointer.new(:int64, 1)

  response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
    @native_kafka,
    topic,
    partition,
    low,
    high,
    timeout_ms
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
  end

  return low.read_int64, high.read_int64
end

#store_offset(message) ⇒ nil

Store offset of a message to be used in the next commit of this consumer

When using this enable.auto.offset.store should be set to false in the config.

Parameters:

Returns:

  • (nil)

Raises:


202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/rdkafka/consumer.rb', line 202

def store_offset(message)
  # rd_kafka_offset_store is one of the few calls that does not support
  # a string as the topic, so create a native topic for it.
  native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
    @native_kafka,
    message.topic,
    nil
  )
  response = Rdkafka::Bindings.rd_kafka_offset_store(
    native_topic,
    message.partition,
    message.offset
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  if native_topic && !native_topic.null?
    Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic)
  end
end

#subscribe(*topics) ⇒ nil

Subscribe to one or more topics letting Kafka handle partition assignments.

Parameters:

  • topics (Array<String>)

    One or more topic names

Returns:

  • (nil)

Raises:


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rdkafka/consumer.rb', line 31

def subscribe(*topics)
  # Create topic partition list with topics and no partition set
  tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length)
  topics.each do |topic|
    Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
      tpl,
      topic,
      -1
    )
  end
  # Subscribe to topic partition list and check this was successful
  response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
  end
ensure
  # Clean up the topic partition list
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end

#subscriptionTopicPartitionList

Return the current subscription to topics and partitions

Returns:

Raises:


68
69
70
71
72
73
74
75
76
# File 'lib/rdkafka/consumer.rb', line 68

def subscription
  tpl = FFI::MemoryPointer.new(:pointer)
  tpl.autorelease = false
  response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
  Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl.get_pointer(0))
end

#unsubscribenil

Unsubscribe from all subscribed topics.

Returns:

  • (nil)

Raises:


56
57
58
59
60
61
# File 'lib/rdkafka/consumer.rb', line 56

def unsubscribe
  response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
end