Class: Kafka::PendingMessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/pending_message_queue.rb

Overview

A pending message queue holds messages that have not yet been assigned to a partition. It's designed to only remove messages once they've been successfully handled.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePendingMessageQueue

Returns a new instance of PendingMessageQueue.



9
10
11
12
13
# File 'lib/kafka/pending_message_queue.rb', line 9

def initialize
  @messages = []
  @size = 0
  @bytesize = 0
end

Instance Attribute Details

#bytesizeObject (readonly)

Returns the value of attribute bytesize.



7
8
9
# File 'lib/kafka/pending_message_queue.rb', line 7

def bytesize
  @bytesize
end

#sizeObject (readonly)

Returns the value of attribute size.



7
8
9
# File 'lib/kafka/pending_message_queue.rb', line 7

def size
  @size
end

Instance Method Details

#dequeue_each {|message| ... } ⇒ nil

Yields each message in the queue to the provided block, removing the message after the block has processed it. If the block raises an exception, the message will be retained in the queue.

Yield Parameters:

Returns:

  • (nil)


31
32
33
34
35
36
37
38
39
40
41
# File 'lib/kafka/pending_message_queue.rb', line 31

def dequeue_each(&block)
  until @messages.empty?
    message = @messages.first

    yield message

    @size -= 1
    @bytesize -= message.bytesize
    @messages.shift
  end
end

#empty?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/kafka/pending_message_queue.rb', line 21

def empty?
  @messages.empty?
end

#write(message) ⇒ Object



15
16
17
18
19
# File 'lib/kafka/pending_message_queue.rb', line 15

def write(message)
  @messages << message
  @size += 1
  @bytesize += message.bytesize
end