Class: Kafka::PendingMessageQueue
- Inherits:
-
Object
- Object
- Kafka::PendingMessageQueue
- Defined in:
- lib/kafka/pending_message_queue.rb
Overview
A pending message queue holds messages that have not yet been assigned to a partition. It's designed to only remove messages once they've been successfully handled.
Instance Attribute Summary collapse
-
#bytesize ⇒ Object
readonly
Returns the value of attribute bytesize.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
-
#dequeue_each {|message| ... } ⇒ nil
Yields each message in the queue to the provided block, removing the message after the block has processed it.
- #empty? ⇒ Boolean
-
#initialize ⇒ PendingMessageQueue
constructor
A new instance of PendingMessageQueue.
- #write(message) ⇒ Object
Constructor Details
#initialize ⇒ PendingMessageQueue
Returns a new instance of PendingMessageQueue.
9 10 11 12 13 |
# File 'lib/kafka/pending_message_queue.rb', line 9 def initialize @messages = [] @size = 0 @bytesize = 0 end |
Instance Attribute Details
#bytesize ⇒ Object (readonly)
Returns the value of attribute bytesize.
7 8 9 |
# File 'lib/kafka/pending_message_queue.rb', line 7 def bytesize @bytesize end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
7 8 9 |
# File 'lib/kafka/pending_message_queue.rb', line 7 def size @size end |
Instance Method Details
#dequeue_each {|message| ... } ⇒ nil
Yields each message in the queue to the provided block, removing the message after the block has processed it. If the block raises an exception, the message will be retained in the queue.
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/kafka/pending_message_queue.rb', line 31 def dequeue_each(&block) until @messages.empty? = @messages.first yield @size -= 1 @bytesize -= .bytesize @messages.shift end end |
#empty? ⇒ Boolean
21 22 23 |
# File 'lib/kafka/pending_message_queue.rb', line 21 def empty? @messages.empty? end |
#write(message) ⇒ Object
15 16 17 18 19 |
# File 'lib/kafka/pending_message_queue.rb', line 15 def write() @messages << @size += 1 @bytesize += .bytesize end |