Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/kafka/consumer.rb,
lib/kafka/consumer/message.rb,
lib/kafka/consumer/version.rb,
lib/kafka/consumer/partition_consumer.rb

Defined Under Namespace

Classes: Message, PartitionConsumer

Constant Summary collapse

BACKPRESSURE_MESSAGE_LIMIT =
1000
VERSION =
"0.0.1"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, subscription, zookeeper: [], chroot: '', max_wait_ms: 200, initial_offset: :latest_offset, logger: nil) ⇒ Consumer

Returns a new instance of Consumer.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/kafka/consumer.rb', line 21

def initialize(name, subscription, zookeeper: [], chroot: '', max_wait_ms: 200, initial_offset: :latest_offset, logger: nil)
  @name, @subscription = name, subscription
  @max_wait_ms, @initial_offset = max_wait_ms, initial_offset
  @logger = logger || Logger.new($stdout)

  @cluster = Kazoo::Cluster.new(zookeeper, chroot: chroot)
  @group = Kazoo::Consumergroup.new(@cluster, name)
  @group.create unless @group.exists?

  @instance = @group.instantiate
  @instance.register(topics)
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



16
17
18
# File 'lib/kafka/consumer.rb', line 16

def cluster
  @cluster
end

#groupObject (readonly)

Returns the value of attribute group.



16
17
18
# File 'lib/kafka/consumer.rb', line 16

def group
  @group
end

#initial_offsetObject (readonly)

Returns the value of attribute initial_offset.



16
17
18
# File 'lib/kafka/consumer.rb', line 16

def initial_offset
  @initial_offset
end

#instanceObject (readonly)

Returns the value of attribute instance.



16
17
18
# File 'lib/kafka/consumer.rb', line 16

def instance
  @instance
end

#loggerObject (readonly)

Returns the value of attribute logger.



16
17
18
# File 'lib/kafka/consumer.rb', line 16

def logger
  @logger
end

#max_wait_msObject (readonly)

Returns the value of attribute max_wait_ms.



16
17
18
# File 'lib/kafka/consumer.rb', line 16

def max_wait_ms
  @max_wait_ms
end

#subscriptionObject (readonly)

Returns the value of attribute subscription.



16
17
18
# File 'lib/kafka/consumer.rb', line 16

def subscription
  @subscription
end

Class Method Details

.distribute_partitions(instances, partitions) ⇒ Object



99
100
101
102
103
104
105
106
107
# File 'lib/kafka/consumer.rb', line 99

def self.distribute_partitions(instances, partitions)
  return {} if instances.empty?
  partitions_per_instance = partitions.length.to_f / instances.length.to_f

  partitions.group_by.with_index do |partition, index|
    instance_index = index.fdiv(partitions_per_instance).floor
    instances[instance_index]
  end
end

Instance Method Details

#dead?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/kafka/consumer.rb', line 78

def dead?
  @consumer_manager.status == false
end

#each(&block) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/kafka/consumer.rb', line 82

def each(&block)
  mutex = Mutex.new

  handler = lambda do |message|
    mutex.synchronize do
      block.call(message)
    end
  end

  @consumer_manager = Thread.new do
    Thread.current.abort_on_exception = true
    manage_partition_consumers(handler)
  end

  wait
end

#idObject



38
39
40
# File 'lib/kafka/consumer.rb', line 38

def id
  instance.id
end

#interruptObject



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/kafka/consumer.rb', line 53

def interrupt
  Thread.new do
    Thread.current.abort_on_exception = true

    logger.info "Stopping partition consumers..."
    @consumer_manager[:interrupted] = true

    # Make sure to wake up the manager thread, so it can shut down
    continue
  end
end

#interrupted?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/kafka/consumer.rb', line 65

def interrupted?
  @consumer_manager[:interrupted]
end

#nameObject



34
35
36
# File 'lib/kafka/consumer.rb', line 34

def name
  group.name
end

#partitionsObject



49
50
51
# File 'lib/kafka/consumer.rb', line 49

def partitions
  topics.flat_map(&:partitions).sort_by { |partition| [partition.leader.id, partition.topic.name, partition.id] }
end

#stopObject



69
70
71
72
# File 'lib/kafka/consumer.rb', line 69

def stop
  interrupt
  wait
end

#topicsObject



42
43
44
45
46
47
# File 'lib/kafka/consumer.rb', line 42

def topics
  @topics ||= begin
    topic_names = Array(subscription)
    topic_names.map { |topic_name| cluster.topics.fetch(topic_name) }
  end
end

#waitObject



74
75
76
# File 'lib/kafka/consumer.rb', line 74

def wait
  @consumer_manager.join if @consumer_manager.alive?
end