Class: Rdkafka::Consumer::TopicPartitionList

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/consumer/topic_partition_list.rb

Overview

A list of topics with their partition information

Instance Method Summary collapse

Constructor Details

#initialize(pointer = nil) ⇒ TopicPartitionList

Create a new topic partition list.

Parameters:

  • pointer (FFI::Pointer, nil) (defaults to: nil)

    Optional pointer to an existing native list



10
11
12
13
14
15
16
17
18
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 10

def initialize(pointer=nil)
  @tpl =
    Rdkafka::Bindings::TopicPartitionList.new(
      FFI::AutoPointer.new(
        pointer || Rdkafka::Bindings.rd_kafka_topic_partition_list_new(5),
        Rdkafka::Bindings.method(:rd_kafka_topic_partition_list_destroy)
      )
  )
end

Instance Method Details

#==(other) ⇒ Object



94
95
96
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 94

def ==(other)
  self.to_h == other.to_h
end

#add_topic(topic, partitions = nil) ⇒ nil

Add a topic with optionally partitions to the list.

Examples:

Add a topic with unassigned partitions

tpl.add_topic("topic")

Add a topic with assigned partitions

tpl.add_topic("topic", (0..8))

Add a topic with all topics up to a count

tpl.add_topic("topic", 9)

Parameters:

  • topic (String)

    The topic's name

  • partition (Array<Integer>, Range<Integer>, Integer)

    The topic's partitions or partition count

Returns:

  • (nil)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 47

def add_topic(topic, partitions=nil)
  if partitions.is_a? Integer
    partitions = (0..partitions - 1)
  end
  if partitions.nil?
    Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
      @tpl,
      topic,
      -1
    )
  else
    partitions.each do |partition|
      Rdkafka::Bindings.rd_kafka_topic_partition_list_add(
        @tpl,
        topic,
        partition
      )
    end
  end
end

#countInteger

Number of items in the list

Returns:

  • (Integer)


22
23
24
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 22

def count
  @tpl[:cnt]
end

#empty?Boolean

Whether this list is empty

Returns:

  • (Boolean)


28
29
30
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 28

def empty?
  count == 0
end

#to_hHash<String, [Array<Partition>, nil]>

Return a Hash with the topics as keys and and an array of partition information as the value if present.

Returns:



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 71

def to_h
  {}.tap do |out|
    count.times do |i|
      ptr = @tpl[:elems] + (i * Rdkafka::Bindings::TopicPartition.size)
      elem = Rdkafka::Bindings::TopicPartition.new(ptr)
      if elem[:partition] == -1
        out[elem[:topic]] = nil
      else
        partitions = out[elem[:topic]] || []
        partition = Partition.new(elem[:partition], elem[:offset])
        partitions.push(partition)
        out[elem[:topic]] = partitions
      end
    end
  end
end

#to_sString

Human readable representation of this list.

Returns:

  • (String)


90
91
92
# File 'lib/rdkafka/consumer/topic_partition_list.rb', line 90

def to_s
  "<TopicPartitionList: #{to_h}>"
end