Class: Kafka::MessageBuffer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/kafka/message_buffer.rb

Overview

Buffers messages for specific topics/partitions.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMessageBuffer

Returns a new instance of MessageBuffer.



13
14
15
16
17
# File 'lib/kafka/message_buffer.rb', line 13

def initialize
  @buffer = {}
  @size = 0
  @bytesize = 0
end

Instance Attribute Details

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



11
12
13
# File 'lib/kafka/message_buffer.rb', line 11

def bytesize
  @bytesize
end

#sizeObject (readonly)

Returns the value of attribute size.



11
12
13
# File 'lib/kafka/message_buffer.rb', line 11

def size
  @size
end

Instance Method Details

#clearnil

Clears messages across all topics and partitions.

Returns:

  • (nil)


74
75
76
77
78
# File 'lib/kafka/message_buffer.rb', line 74

def clear
  @buffer = {}
  @size = 0
  @bytesize = 0
end

#clear_messages(topic:, partition:) ⇒ nil

Clears buffered messages for the given topic and partition.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition id.

Returns:

  • (nil)


57
58
59
60
61
62
63
64
65
# File 'lib/kafka/message_buffer.rb', line 57

def clear_messages(topic:, partition:)
  return unless @buffer.key?(topic) && @buffer[topic].key?(partition)

  @size -= @buffer[topic][partition].count
  @bytesize -= @buffer[topic][partition].map(&:bytesize).reduce(0, :+)

  @buffer[topic].delete(partition)
  @buffer.delete(topic) if @buffer[topic].empty?
end

#concat(messages, topic:, partition:) ⇒ Object



28
29
30
31
32
33
# File 'lib/kafka/message_buffer.rb', line 28

def concat(messages, topic:, partition:)
  buffer_for(topic, partition).concat(messages)

  @size += messages.count
  @bytesize += messages.map(&:bytesize).reduce(0, :+)
end

#eachObject



43
44
45
46
47
48
49
# File 'lib/kafka/message_buffer.rb', line 43

def each
  @buffer.each do |topic, messages_for_topic|
    messages_for_topic.each do |partition, messages_for_partition|
      yield topic, partition, messages_for_partition
    end
  end
end

#empty?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/kafka/message_buffer.rb', line 39

def empty?
  @buffer.empty?
end

#messages_for(topic:, partition:) ⇒ Object



67
68
69
# File 'lib/kafka/message_buffer.rb', line 67

def messages_for(topic:, partition:)
  buffer_for(topic, partition)
end

#to_hObject



35
36
37
# File 'lib/kafka/message_buffer.rb', line 35

def to_h
  @buffer
end

#write(value:, key:, topic:, partition:, create_time: Time.now, headers: {}) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/kafka/message_buffer.rb', line 19

def write(value:, key:, topic:, partition:, create_time: Time.now, headers: {})
  message = Protocol::Record.new(key: key, value: value, create_time: create_time, headers: headers)

  buffer_for(topic, partition) << message

  @size += 1
  @bytesize += message.bytesize
end