Class: Kafka::MessageBuffer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/kafka/message_buffer.rb

Overview

Buffers messages for specific topics/partitions.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMessageBuffer

Returns a new instance of MessageBuffer.



9
10
11
12
# File 'lib/kafka/message_buffer.rb', line 9

def initialize
  @buffer = {}
  @size = 0
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



7
8
9
# File 'lib/kafka/message_buffer.rb', line 7

def size
  @size
end

Instance Method Details

#clearnil

Clears messages across all topics and partitions.

Returns:

  • (nil)


56
57
58
59
# File 'lib/kafka/message_buffer.rb', line 56

def clear
  @buffer = {}
  @size = 0
end

#clear_messages(topic:, partition:) ⇒ nil

Clears buffered messages for the given topic and partition.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition id.

Returns:

  • (nil)


46
47
48
49
50
51
# File 'lib/kafka/message_buffer.rb', line 46

def clear_messages(topic:, partition:)
  @size -= @buffer[topic][partition].count

  @buffer[topic].delete(partition)
  @buffer.delete(topic) if @buffer[topic].empty?
end

#concat(messages, topic:, partition:) ⇒ Object



19
20
21
22
# File 'lib/kafka/message_buffer.rb', line 19

def concat(messages, topic:, partition:)
  @size += messages.count
  buffer_for(topic, partition).concat(messages)
end

#eachObject



32
33
34
35
36
37
38
# File 'lib/kafka/message_buffer.rb', line 32

def each
  @buffer.each do |topic, messages_for_topic|
    messages_for_topic.each do |partition, messages_for_partition|
      yield topic, partition, messages_for_partition
    end
  end
end

#empty?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/kafka/message_buffer.rb', line 28

def empty?
  @buffer.empty?
end

#to_hObject



24
25
26
# File 'lib/kafka/message_buffer.rb', line 24

def to_h
  @buffer
end

#write(message, topic:, partition:) ⇒ Object



14
15
16
17
# File 'lib/kafka/message_buffer.rb', line 14

def write(message, topic:, partition:)
  @size += 1
  buffer_for(topic, partition) << message
end