Class: Poseidon::ConsumerGroup

Inherits:
Object
  • Object
show all
Defined in:
lib/poseidon/consumer_group.rb

Overview

A ConsumerGroup operates on all partitions of a single topic. The goal is to ensure each topic message is consumed only once, no matter of the number of consumer instances within a cluster, as described in: kafka.apache.org/documentation.html#distributionimpl.

The ConsumerGroup internally creates multiple PartitionConsumer instances. It uses Zookkeper and follows a simple consumer rebalancing algorithms which allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Each ConsumerGroup can ‘claim’ 0-n partitions and will consume their messages until another ConsumerGroup instance joins or leaves the cluster.

Please note: ConsumerGroups themselves don’t implement any threading or concurrency. When consuming messages, they simply round-robin across the claimed partitions. If you wish to parallelize consumption simply create multiple ConsumerGroups instances. The built-in concensus algorithm will automatically rebalance the available partitions between them and you can then decide for yourself if you want to run them in multiple thread or processes, ideally on multiple boxes.

Unlike stated in the Kafka documentation, consumer rebalancing is only triggered on each addition or removal of consumers within the same group, while the addition of broker nodes and/or partition *does currently not trigger* a rebalancing cycle.

Defined Under Namespace

Classes: Consumer

Constant Summary collapse

DEFAULT_CLAIM_TIMEOUT =
30
DEFAULT_LOOP_DELAY =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, brokers, zookeepers, topic, options = {}) ⇒ ConsumerGroup

Create a new consumer group, which processes all partition of the specified topic.

Parameters:

  • name (String)

    Group name

  • brokers (Array<String>)

    A list of known brokers, e.g. [“localhost:9092”]

  • zookeepers (Array<String>)

    A list of known zookeepers, e.g. [“localhost:2181”]

  • topic (String)

    Topic to operate on

  • options (Hash) (defaults to: {})

    Consumer options

Options Hash (options):

  • :max_bytes (Integer)

    Maximum number of bytes to fetch. Default: 1048576 (1MB)

  • :max_wait_ms (Integer)

    How long to block until the server sends us data. Default: 100 (100ms)

  • :min_bytes (Integer)

    Smallest amount of data the server should send us. Default: 0 (Send us data as soon as it is ready)

  • :claim_timeout (Integer)

    Maximum number of seconds to wait for a partition claim. Default: 10

  • :loop_delay (Integer)

    Number of seconds to delay the next fetch (in #fetch_loop) if nothing was returned. Default: 1

  • :socket_timeout_ms (Integer)

    broker connection wait timeout in ms. Default: 10000

  • :register (Boolean)

    Automatically register instance and start consuming. Default: true

  • :trail (Boolean)

    Starts reading messages from the latest partitions offsets and skips ‘old’ messages . Default: false



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/poseidon/consumer_group.rb', line 97

def initialize(name, brokers, zookeepers, topic, options = {})
  @name       = name
  @topic      = topic
  @zk         = ::ZK.new(zookeepers.join(","))
  # Poseidon::BrokerPool doesn't provide default value for this option
  # Configuring default value like this isn't beautiful, though.. by kssminus
  options[:socket_timeout_ms] ||= 10000
  @options    = options
  @consumers  = []
  @pool       = ::Poseidon::BrokerPool.new(id, brokers, options[:socket_timeout_ms])
  @mutex      = Mutex.new
  @registered = false

  register! unless options.delete(:register) == false
end

Instance Attribute Details

#nameObject (readonly)



66
67
68
# File 'lib/poseidon/consumer_group.rb', line 66

def name
  @name
end

#optionsObject (readonly)



78
79
80
# File 'lib/poseidon/consumer_group.rb', line 78

def options
  @options
end

#poolObject (readonly)



72
73
74
# File 'lib/poseidon/consumer_group.rb', line 72

def pool
  @pool
end

#topicObject (readonly)



69
70
71
# File 'lib/poseidon/consumer_group.rb', line 69

def topic
  @topic
end

#zkObject (readonly)



75
76
77
# File 'lib/poseidon/consumer_group.rb', line 75

def zk
  @zk
end

Class Method Details

.pick(pnum, cids, id) ⇒ Range, NilClass

Returns selectable range, if any.

Parameters:

  • pnum (Integer)

    number of partitions size

  • cids (Array<String>)

    consumer IDs

  • id (String)

    consumer ID

Returns:

  • (Range, NilClass)

    selectable range, if any



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/poseidon/consumer_group.rb', line 51

def self.pick(pnum, cids, id)
  cids = cids.sort
  pos  = cids.index(id)
  return unless pos && pos < cids.size

  step = pnum.fdiv(cids.size).ceil
  frst = pos*step
  last = (pos+1)*step-1
  last = pnum-1 if last > pnum-1
  return if last < 0 || last < frst

  (frst..last)
end

Instance Method Details

#checkout(opts = {}) {|consumer| ... } ⇒ Boolean

Checks out a single partition consumer. Round-robins between claimed partitions.

Examples:


ok = group.checkout do |consumer|
  puts "Checked out consumer for partition #{consumer.partition}"
end
ok # => true if the block was run, false otherwise

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :commit (Boolean)

    Automatically commit consumer offset (default: true)

Yields:

  • (consumer)

    The processing block

Yield Parameters:

  • consumer (Consumer)

    The consumer instance

Yield Returns:

  • (Boolean)

    return false to stop auto-commit

Returns:

  • (Boolean)

    true if a consumer was checked out, false if none could be claimed



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/poseidon/consumer_group.rb', line 229

def checkout(opts = {})
  consumer = nil
  commit   = @mutex.synchronize do
    consumer = @consumers.shift
    return false unless consumer

    @consumers.push consumer
    yield consumer
  end

  unless opts[:commit] == false || commit == false
    commit consumer.partition, consumer.offset
  end
  true
end

#claimedArray<Integer>

Partitions currently claimed and consumed by this group instance

Returns:

  • (Array<Integer>)

    partition IDs



207
208
209
# File 'lib/poseidon/consumer_group.rb', line 207

def claimed
  @consumers.map(&:partition).sort
end

#closeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the consumer group gracefully, only really useful in tests



167
168
169
170
# File 'lib/poseidon/consumer_group.rb', line 167

def close
  @mutex.synchronize { release_all! }
  zk.close
end

#commit(partition, offset) ⇒ Object

Commits the latest offset for a partition

Parameters:

  • partition (Integer)
  • offset (Integer)


188
189
190
191
192
# File 'lib/poseidon/consumer_group.rb', line 188

def commit(partition, offset)
  zk.set offset_path(partition), offset.to_s
rescue ZK::Exceptions::NoNode
  zk.create offset_path(partition), offset.to_s, ignore: :node_exists
end

#fetch(opts = {}) {|partition, messages| ... } ⇒ Boolean

Convenience method to fetch messages from the broker. Round-robins between claimed partitions.

Examples:


ok = group.fetch do |n, messages|
  puts "Fetched #{messages.size} messages for partition #{n}"
end
ok # => true if the block was run, false otherwise

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :commit (Boolean)

    Automatically commit consumed offset (default: true)

Yields:

  • (partition, messages)

    The processing block

Yield Parameters:

  • partition (Integer)

    The source partition

  • messages (Array<Message>)

    The fetched messages

Yield Returns:

  • (Boolean)

    return false to prevent auto-commit

Returns:

  • (Boolean)

    true if messages were fetched, false if none could be claimed



265
266
267
268
269
# File 'lib/poseidon/consumer_group.rb', line 265

def fetch(opts = {})
  checkout(opts) do |consumer|
    yield consumer.partition, consumer.fetch
  end
end

#fetch_loop(opts = {}) {|partition, messages| ... } ⇒ Object

Initializes an infinite fetch loop. This method blocks!

Will wait for ‘loop_delay` seconds after each failed fetch. This may happen when there is no new data or when the consumer hasn’t claimed any partitions.

SPECIAL ATTENTION: When ‘breaking out’ of the loop, you must do it before processing the messages, as the the last offset will not be committed. Please see examples below.

Examples:


group.fetch_loop do |n, messages|
  puts "Fetched #{messages.size} messages for partition #{n}"
end
puts "Done" # => this code is never reached

Stopping the loop (wrong)


counts = Hash.new(0)
group.fetch_loop do |n, messages|
  counts[n] += messages.size
  puts "Status: #{counts.inspect}"
  break if counts[0] > 100
end
puts "Result: #{counts.inspect}"
puts "Offset: #{group.offset(0)}"

# Output:
# Status: {0=>30}
# Status: {0=>60}
# Status: {0=>90}
# Status: {0=>120}
# Result: {0=>120}
# Offset: 90      # => Last offset was not committed!

Stopping the loop (correct)


counts = Hash.new(0)
group.fetch_loop do |n, messages|
  break if counts[0] > 100
  counts[n] += messages.size
  puts "Status: #{counts.inspect}"
end
puts "Result: #{counts.inspect}"
puts "Offset: #{group.offset(0)}"

# Output:
# Status: {0=>30}
# Status: {0=>60}
# Status: {0=>90}
# Status: {0=>120}
# Result: {0=>120}
# Offset: 120

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :commit (Boolean)

    Automatically commit consumed offset (default: true)

  • :loop_delay (Boolean)

    Delay override in seconds after unsuccessful fetch.

Yields:

  • (partition, messages)

    The processing block

Yield Parameters:

  • partition (Integer)

    The source partition, may be -1 if no partitions are claimed

  • messages (Array<Message>)

    The fetched messages

Yield Returns:

  • (Boolean)

    return false to prevent auto-commit



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/poseidon/consumer_group.rb', line 335

def fetch_loop(opts = {})
  delay = opts[:loop_delay] || options[:loop_delay] || DEFAULT_LOOP_DELAY

  loop do
    mp = false
    ok = fetch(opts) do |n, messages|
      mp = !messages.empty?
      yield n, messages
    end

    # Yield over an empty array if nothing claimed,
    # to allow user to e.g. break out of the loop
    unless ok
      yield -1, []
    end

    # Sleep if either not claimes or nothing returned
    unless ok && mp
      sleep delay
    end
  end
end

#idString

Returns a globally unique identifier.

Returns:

  • (String)

    a globally unique identifier



114
115
116
# File 'lib/poseidon/consumer_group.rb', line 114

def id
  @id ||= [name, Poseidon::Cluster.guid].join("-")
end

#leader(partition) ⇒ Poseidon::Protocol::Broker

Returns the leader for the given partition.

Parameters:

  • partition (Integer)

Returns:

  • (Poseidon::Protocol::Broker)

    the leader for the given partition



174
175
176
# File 'lib/poseidon/consumer_group.rb', line 174

def leader(partition)
  .lead_broker_for_partition(topic, partition)
end

#metadataPoseidon::ClusterMetadata

Returns cluster metadata.

Returns:

  • (Poseidon::ClusterMetadata)

    cluster metadata



128
129
130
# File 'lib/poseidon/consumer_group.rb', line 128

def 
  @metadata ||= Poseidon::ClusterMetadata.new.tap {|m| m.update pool.([topic]) }
end

#offset(partition) ⇒ Integer

Returns the latest stored offset for the given partition.

Parameters:

  • partition (Integer)

Returns:

  • (Integer)

    the latest stored offset for the given partition



180
181
182
183
# File 'lib/poseidon/consumer_group.rb', line 180

def offset(partition)
  data, _ = zk.get offset_path(partition), ignore: :no_node
  data.to_i
end

#partitionsArray<Poseidon::Protocol::PartitionMetadata>

Sorted partitions by broker address (so partitions on the same broker are clustered together)

Returns:

  • (Array<Poseidon::Protocol::PartitionMetadata>)

    sorted partitions



196
197
198
199
200
201
202
203
# File 'lib/poseidon/consumer_group.rb', line 196

def partitions
  return [] unless 

  .available_partitions.sort_by do |part|
    broker = .brokers[part.leader]
    [broker.host, broker.port].join(":")
  end
end

#register!Boolean

Returns true if registration was successful, false if already registered.

Returns:

  • (Boolean)

    true if registration was successful, false if already registered



143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/poseidon/consumer_group.rb', line 143

def register!
  return false if registered?

  # Register instance
  registries.each do |_, path|
    zk.mkdir_p(path)
  end
  zk.create(consumer_path, "{}", ephemeral: true)
  zk.register(registries[:consumer]) {|_| rebalance! }

  # Rebalance
  rebalance!
  @registered = true
end

#registered?Boolean

Returns true if registered.

Returns:

  • (Boolean)

    true if registered



138
139
140
# File 'lib/poseidon/consumer_group.rb', line 138

def registered?
  @registered
end

#registriesHash<Symbol,String>

Returns registry paths.

Returns:

  • (Hash<Symbol,String>)

    registry paths



119
120
121
122
123
124
125
# File 'lib/poseidon/consumer_group.rb', line 119

def registries
  @registries ||= {
    consumer: "/consumers/#{name}/ids",
    owner:    "/consumers/#{name}/owners/#{topic}",
    offset:   "/consumers/#{name}/offsets/#{topic}",
  }
end

#reloadObject

Reloads metadata/broker/partition information



159
160
161
162
163
# File 'lib/poseidon/consumer_group.rb', line 159

def reload
  @metadata = @topic_metadata = nil
  
  self
end

#topic_metadataPoseidon::TopicMetadata

Returns topic metadata.

Returns:

  • (Poseidon::TopicMetadata)

    topic metadata



133
134
135
# File 'lib/poseidon/consumer_group.rb', line 133

def 
  @topic_metadata ||= .([topic])[topic]
end