Class: Rdkafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/rdkafka/consumer.rb,
lib/rdkafka/consumer/headers.rb,
lib/rdkafka/consumer/message.rb,
lib/rdkafka/consumer/partition.rb,
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka brokers automatically assign partitions and load balance partitions over consumers that have the same :"group.id" set in their configuration.

To create a consumer set up a Config and call consumer on that. It is mandatory to set :"group.id" in the configuration.

Defined Under Namespace

Classes: Headers, Message, Partition, TopicPartitionList

Instance Method Summary collapse

Instance Method Details

#assign(list) ⇒ Object

Atomic assignment of partitions to consume

Parameters:

Raises:



124
125
126
127
128
129
130
131
132
133
# File 'lib/rdkafka/consumer.rb', line 124

def assign(list)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end
  tpl = list.to_native_tpl
  response = Rdkafka::Bindings.rd_kafka_assign(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error assigning '#{list.to_h}'")
  end
end

#assignmentTopicPartitionList

Returns the current partition assignment.

Returns:

Raises:



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/rdkafka/consumer.rb', line 140

def assignment
  tpl = FFI::MemoryPointer.new(:pointer)
  response = Rdkafka::Bindings.rd_kafka_assignment(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end

  tpl = tpl.read(:pointer).tap { |it| it.autorelease = false  }

  begin
    Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy tpl
  end
end

#closenil

Close this consumer

Returns:

  • (nil)


19
20
21
22
# File 'lib/rdkafka/consumer.rb', line 19

def close
  @closing = true
  Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
end

#cluster_idString?

Returns the ClusterId as reported in broker metadata.

Returns:

  • (String, nil)


241
242
243
# File 'lib/rdkafka/consumer.rb', line 241

def cluster_id
  Rdkafka::Bindings.rd_kafka_clusterid(@native_kafka)
end

#commit(list = nil, async = false) ⇒ nil

Commit the current offsets of this consumer

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to commit

  • async (Boolean) (defaults to: false)

    Whether to commit async or wait for the commit to finish

Returns:

  • (nil)

Raises:



324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/rdkafka/consumer.rb', line 324

def commit(list=nil, async=false)
  if !list.nil? && !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end
  tpl = if list
          list.to_native_tpl
        else
          nil
        end
  response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, tpl, async)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
end

#committed(list = nil, timeout_ms = 1200) ⇒ TopicPartitionList

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

Parameters:

  • list (TopicPartitionList, nil) (defaults to: nil)

    The topic with partitions to get the offsets for or nil to use the current subscription.

  • timeout_ms (Integer) (defaults to: 1200)

    The timeout for fetching this information.

Returns:

Raises:

  • (RdkafkaError)

    When getting the committed positions fails.



165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/rdkafka/consumer.rb', line 165

def committed(list=nil, timeout_ms=1200)
  if list.nil?
    list = assignment
  elsif !list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be nil or a TopicPartitionList")
  end
  tpl = list.to_native_tpl
  response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
  TopicPartitionList.from_native_tpl(tpl)
end

#each {|message| ... } ⇒ nil

Poll for new messages and yield for each received one. Iteration will end when the consumer is closed.

If enable.partition.eof is turned on in the config this will raise an error when an eof is reached, so you probably want to disable that when using this method of iteration.

Yield Parameters:

  • message (Message)

    Received message

Returns:

  • (nil)

Raises:



379
380
381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/rdkafka/consumer.rb', line 379

def each
  loop do
    message = poll(250)
    if message
      yield(message)
    else
      if @closing
        break
      else
        next
      end
    end
  end
end

#lag(topic_partition_list, watermark_timeout_ms = 100) ⇒ Hash<String, Hash<Integer, Integer>>

Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.

Parameters:

  • topic_partition_list (TopicPartitionList)

    The list to calculate lag for.

  • watermark_timeout_ms (Integer) (defaults to: 100)

    The timeout for each query watermark call.

Returns:

  • (Hash<String, Hash<Integer, Integer>>)

    A hash containing all topics with the lag per partition

Raises:



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/rdkafka/consumer.rb', line 218

def lag(topic_partition_list, watermark_timeout_ms=100)
  out = {}
  topic_partition_list.to_h.each do |topic, partitions|
    # Query high watermarks for this topic's partitions
    # and compare to the offset in the list.
    topic_out = {}
    partitions.each do |p|
      next if p.offset.nil?
      low, high = query_watermark_offsets(
        topic,
        p.partition,
        watermark_timeout_ms
      )
      topic_out[p.partition] = high - p.offset
    end
    out[topic] = topic_out
  end
  out
end

#member_idString?

Returns this client's broker-assigned group member id

This currently requires the high-level KafkaConsumer

Returns:

  • (String, nil)


250
251
252
# File 'lib/rdkafka/consumer.rb', line 250

def member_id
  Rdkafka::Bindings.rd_kafka_memberid(@native_kafka)
end

#pause(list) ⇒ nil

Pause producing or consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/rdkafka/consumer.rb', line 68

def pause(list)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end
  tpl = list.to_native_tpl
  response = Rdkafka::Bindings.rd_kafka_pause_partitions(@native_kafka, tpl)

  if response != 0
    list = TopicPartitionList.from_native_tpl(tpl)
    raise Rdkafka::RdkafkaTopicPartitionListError.new(response, list, "Error pausing '#{list.to_h}'")
  end
end

#poll(timeout_ms) ⇒ Message?

Poll for the next message on one of the subscribed topics

Parameters:

  • timeout_ms (Integer)

    Timeout of this poll

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:



346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/rdkafka/consumer.rb', line 346

def poll(timeout_ms)
  message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != 0
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if !message_ptr.nil? && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end

#query_watermark_offsets(topic, partition, timeout_ms = 200) ⇒ Integer

Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.

Parameters:

  • topic (String)

    The topic to query

  • partition (Integer)

    The partition to query

  • timeout_ms (Integer) (defaults to: 200)

    The timeout for querying the broker

Returns:

  • (Integer)

    The low and high watermark

Raises:



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/rdkafka/consumer.rb', line 188

def query_watermark_offsets(topic, partition, timeout_ms=200)
  low = FFI::MemoryPointer.new(:int64, 1)
  high = FFI::MemoryPointer.new(:int64, 1)

  response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
    @native_kafka,
    topic,
    partition,
    low,
    high,
    timeout_ms
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
  end

  return low.read_int64, high.read_int64
end

#resume(list) ⇒ nil

Resume producing consumption for the provided list of partitions

Parameters:

Returns:

  • (nil)

Raises:



88
89
90
91
92
93
94
95
96
97
# File 'lib/rdkafka/consumer.rb', line 88

def resume(list)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end
  tpl = list.to_native_tpl
  response = Rdkafka::Bindings.rd_kafka_resume_partitions(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error resume '#{list.to_h}'")
  end
end

#seek(message) ⇒ nil

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:

Returns:

  • (nil)

Raises:



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/rdkafka/consumer.rb', line 293

def seek(message)
  # rd_kafka_offset_store is one of the few calls that does not support
  # a string as the topic, so create a native topic for it.
  native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
    @native_kafka,
    message.topic,
    nil
  )
  response = Rdkafka::Bindings.rd_kafka_seek(
    native_topic,
    message.partition,
    message.offset,
    0 # timeout
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  if native_topic && !native_topic.null?
    Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic)
  end
end

#store_offset(message) ⇒ nil

Store offset of a message to be used in the next commit of this consumer

When using this enable.auto.offset.store should be set to false in the config.

Parameters:

Returns:

  • (nil)

Raises:



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/rdkafka/consumer.rb', line 263

def store_offset(message)
  # rd_kafka_offset_store is one of the few calls that does not support
  # a string as the topic, so create a native topic for it.
  native_topic = Rdkafka::Bindings.rd_kafka_topic_new(
    @native_kafka,
    message.topic,
    nil
  )
  response = Rdkafka::Bindings.rd_kafka_offset_store(
    native_topic,
    message.partition,
    message.offset
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
ensure
  if native_topic && !native_topic.null?
    Rdkafka::Bindings.rd_kafka_topic_destroy(native_topic)
  end
end

#subscribe(*topics) ⇒ nil

Subscribe to one or more topics letting Kafka handle partition assignments.

Parameters:

  • topics (Array<String>)

    One or more topic names

Returns:

  • (nil)

Raises:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rdkafka/consumer.rb', line 31

def subscribe(*topics)
  # Create topic partition list with topics and no partition set
  tpl = TopicPartitionList.new_native_tpl(topics.length)

  topics.each do |topic|
    Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
      tpl,
      topic,
      -1
    )
  end
  # Subscribe to topic partition list and check this was successful
  response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
  end
end

#subscriptionTopicPartitionList

Return the current subscription to topics and partitions

Returns:

Raises:



104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/rdkafka/consumer.rb', line 104

def subscription
  tpl = FFI::MemoryPointer.new(:pointer)
  response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
  tpl = tpl.read(:pointer).tap { |it| it.autorelease = false }

  begin
    Rdkafka::Consumer::TopicPartitionList.from_native_tpl(tpl)
  ensure
    Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
  end
end

#unsubscribenil

Unsubscribe from all subscribed topics.

Returns:

  • (nil)

Raises:



54
55
56
57
58
59
# File 'lib/rdkafka/consumer.rb', line 54

def unsubscribe
  response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
end