Class: Kafka::MessageBuffer

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/kafka/message_buffer.rb

Overview

Buffers messages for specific topics/partitions.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMessageBuffer

Returns a new instance of MessageBuffer.



11
12
13
14
15
# File 'lib/kafka/message_buffer.rb', line 11

def initialize
  @buffer = {}
  @size = 0
  @bytesize = 0
end

Instance Attribute Details

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



9
10
11
# File 'lib/kafka/message_buffer.rb', line 9

def bytesize
  @bytesize
end

#sizeObject (readonly)

Returns the value of attribute size.



9
10
11
# File 'lib/kafka/message_buffer.rb', line 9

def size
  @size
end

Instance Method Details

#clearnil

Clears messages across all topics and partitions.

Returns:

  • (nil)


72
73
74
75
76
# File 'lib/kafka/message_buffer.rb', line 72

def clear
  @buffer = {}
  @size = 0
  @bytesize = 0
end

#clear_messages(topic:, partition:) ⇒ nil

Clears buffered messages for the given topic and partition.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition id.

Returns:

  • (nil)


55
56
57
58
59
60
61
62
63
# File 'lib/kafka/message_buffer.rb', line 55

def clear_messages(topic:, partition:)
  return unless @buffer.key?(topic) && @buffer[topic].key?(partition)

  @size -= @buffer[topic][partition].count
  @bytesize -= @buffer[topic][partition].map(&:bytesize).reduce(:+)

  @buffer[topic].delete(partition)
  @buffer.delete(topic) if @buffer[topic].empty?
end

#concat(messages, topic:, partition:) ⇒ Object



26
27
28
29
30
31
# File 'lib/kafka/message_buffer.rb', line 26

def concat(messages, topic:, partition:)
  buffer_for(topic, partition).concat(messages)

  @size += messages.count
  @bytesize += messages.map(&:bytesize).reduce(0, :+)
end

#eachObject



41
42
43
44
45
46
47
# File 'lib/kafka/message_buffer.rb', line 41

def each
  @buffer.each do |topic, messages_for_topic|
    messages_for_topic.each do |partition, messages_for_partition|
      yield topic, partition, messages_for_partition
    end
  end
end

#empty?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/kafka/message_buffer.rb', line 37

def empty?
  @buffer.empty?
end

#messages_for(topic:, partition:) ⇒ Object



65
66
67
# File 'lib/kafka/message_buffer.rb', line 65

def messages_for(topic:, partition:)
  buffer_for(topic, partition)
end

#to_hObject



33
34
35
# File 'lib/kafka/message_buffer.rb', line 33

def to_h
  @buffer
end

#write(value:, key:, topic:, partition:, create_time: Time.now) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/kafka/message_buffer.rb', line 17

def write(value:, key:, topic:, partition:, create_time: Time.now)
  message = Protocol::Message.new(key: key, value: value, create_time: create_time)

  buffer_for(topic, partition) << message

  @size += 1
  @bytesize += message.bytesize
end