Class: Rdkafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/rdkafka/consumer.rb,
lib/rdkafka/consumer/message.rb,
lib/rdkafka/consumer/partition.rb,
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka brokers automatically assign partitions and load balance partitions over consumers that have the same :"group.id" set in their configuration.

To create a consumer set up a Config and call consumer on that. It is mandatory to set :"group.id" in the configuration.

Defined Under Namespace

Classes: Message, Partition, TopicPartitionList

Instance Method Summary collapse

Instance Method Details

#closenil

Close this consumer

Returns:

  • (nil)


18
19
20
# File 'lib/rdkafka/consumer.rb', line 18

def close
  Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
end

#commit(async = false) ⇒ nil

Commit the current offsets of this consumer

Parameters:

  • async (Boolean) (defaults to: false)

    Whether to commit async or wait for the commit to finish

Returns:

  • (nil)

Raises:



163
164
165
166
167
168
# File 'lib/rdkafka/consumer.rb', line 163

def commit(async=false)
  response = Rdkafka::Bindings.rd_kafka_commit(@native_kafka, nil, async)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
end

#committed(list, timeout_ms = 200) ⇒ TopicPartitionList

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

TODO: This should use the subscription or assignment by default.

Parameters:

  • list (TopicPartitionList)

    The topic with partitions to get the offsets for.

  • timeout_ms (Integer) (defaults to: 200)

    The timeout for fetching this information.

Returns:

Raises:

  • (RdkafkaError)

    When getting the committed positions fails.



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/rdkafka/consumer.rb', line 86

def committed(list, timeout_ms=200)
  unless list.is_a?(TopicPartitionList)
    raise TypeError.new("list has to be a TopicPartitionList")
  end
  tpl = list.copy_tpl
  response = Rdkafka::Bindings.rd_kafka_committed(@native_kafka, tpl, timeout_ms)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
  Rdkafka::Consumer::TopicPartitionList.new(tpl)
end

#each {|message| ... } ⇒ nil

Poll for new messages and yield for each received one

Yield Parameters:

  • message (Message)

    Received message

Returns:

  • (nil)

Raises:



205
206
207
208
209
210
211
212
213
214
# File 'lib/rdkafka/consumer.rb', line 205

def each(&block)
  loop do
    message = poll(250)
    if message
      block.call(message)
    else
      next
    end
  end
end

#lag(topic_partition_list, watermark_timeout_ms = 100) ⇒ Hash<String, Hash<Integer, Integer>>

Calculate the consumer lag per partition for the provided topic partition list. You can get a suitable list by calling #committed or position (TODO). It is also possible to create one yourself, in this case you have to provide a list that already contains all the partitions you need the lag for.

Parameters:

  • topic_partition_list (TopicPartitionList)

    The list to calculate lag for.

  • watermark_timeout_ms (Integer) (defaults to: 100)

    The timeout for each query watermark call.

Returns:

  • (Hash<String, Hash<Integer, Integer>>)

    A hash containing all topics with the lag per partition

Raises:



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/rdkafka/consumer.rb', line 137

def lag(topic_partition_list, watermark_timeout_ms=100)
  out = {}
  topic_partition_list.to_h.each do |topic, partitions|
    # Query high watermarks for this topic's partitions
    # and compare to the offset in the list.
    topic_out = {}
    partitions.each do |p|
      low, high = query_watermark_offsets(
        topic,
        p.partition,
        watermark_timeout_ms
      )
      topic_out[p.partition] = high - p.offset
    end
    out[topic] = topic_out
  end
  out
end

#poll(timeout_ms) ⇒ Message?

Poll for the next message on one of the subscribed topics

Parameters:

  • timeout_ms (Integer)

    Timeout of this poll

Returns:

  • (Message, nil)

    A message or nil if there was no new message within the timeout

Raises:



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/rdkafka/consumer.rb', line 177

def poll(timeout_ms)
  message_ptr = Rdkafka::Bindings.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
  if message_ptr.null?
    nil
  else
    # Create struct wrapper
    native_message = Rdkafka::Bindings::Message.new(message_ptr)
    # Raise error if needed
    if native_message[:err] != 0
      raise Rdkafka::RdkafkaError.new(native_message[:err])
    end
    # Create a message to pass out
    Rdkafka::Consumer::Message.new(native_message)
  end
ensure
  # Clean up rdkafka message if there is one
  if !message_ptr.nil? && !message_ptr.null?
    Rdkafka::Bindings.rd_kafka_message_destroy(message_ptr)
  end
end

#query_watermark_offsets(topic, partition, timeout_ms = 200) ⇒ Integer

Query broker for low (oldest/beginning) and high (newest/end) offsets for a partition.

Parameters:

  • topic (String)

    The topic to query

  • partition (Integer)

    The partition to query

  • timeout_ms (Integer) (defaults to: 200)

    The timeout for querying the broker

Returns:

  • (Integer)

    The low and high watermark

Raises:



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/rdkafka/consumer.rb', line 107

def query_watermark_offsets(topic, partition, timeout_ms=200)
  low = FFI::MemoryPointer.new(:int64, 1)
  high = FFI::MemoryPointer.new(:int64, 1)

  response = Rdkafka::Bindings.rd_kafka_query_watermark_offsets(
    @native_kafka,
    topic,
    partition,
    low,
    high,
    timeout_ms
  )
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error querying watermark offsets for partition #{partition} of #{topic}")
  end

  return low.read_int64, high.read_int64
end

#subscribe(*topics) ⇒ nil

Subscribe to one or more topics letting Kafka handle partition assignments.

Parameters:

  • topics (Array<String>)

    One or more topic names

Returns:

  • (nil)

Raises:



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/rdkafka/consumer.rb', line 29

def subscribe(*topics)
  # Create topic partition list with topics and no partition set
  tpl = Rdkafka::Bindings.rd_kafka_topic_partition_list_new(topics.length)
  topics.each do |topic|
    Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
      tpl,
      topic,
      -1
    )
  end
  # Subscribe to topic partition list and check this was successful
  response = Rdkafka::Bindings.rd_kafka_subscribe(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response, "Error subscribing to '#{topics.join(', ')}'")
  end
ensure
  # Clean up the topic partition list
  Rdkafka::Bindings.rd_kafka_topic_partition_list_destroy(tpl)
end

#subscriptionTopicPartitionList

Return the current subscription to topics and partitions

Returns:

Raises:



66
67
68
69
70
71
72
73
# File 'lib/rdkafka/consumer.rb', line 66

def subscription
  tpl = FFI::MemoryPointer.new(:pointer)
  response = Rdkafka::Bindings.rd_kafka_subscription(@native_kafka, tpl)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
  Rdkafka::Consumer::TopicPartitionList.new(tpl.get_pointer(0))
end

#unsubscribenil

Unsubscribe from all subscribed topics.

Returns:

  • (nil)

Raises:



54
55
56
57
58
59
# File 'lib/rdkafka/consumer.rb', line 54

def unsubscribe
  response = Rdkafka::Bindings.rd_kafka_unsubscribe(@native_kafka)
  if response != 0
    raise Rdkafka::RdkafkaError.new(response)
  end
end