Class: CircularQueue
- Inherits:
-
Object
- Object
- CircularQueue
- Defined in:
- lib/circular_queue.rb
Overview
A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if enq! is used).
Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.
Exposes the same interface as the Queue from the Ruby stdlib.
Example:
# Capacity of 3
q = CircularQueue.new(3)
q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]
# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]
Instance Attribute Summary collapse
-
#capacity ⇒ Integer
readonly
Returns the maximum number of elements that can be enqueued.
-
#size ⇒ Integer
(also: #length)
readonly
Returns the number of elements in the queue.
Instance Method Summary collapse
-
#back ⇒ Object
Returns the last/most recent item in the queue Peek at last item without removing.
-
#clear ⇒ CircularQueue
Removes all items from the queue.
-
#data ⇒ Array
Returns the data in the queue Allows for easy iteration of queue from front to back.
-
#deq(non_block = false) ⇒ Object
(also: #shift, #pop)
Removes an item from the queue.
-
#empty? ⇒ Boolean
Returns whether the queue is empty.
-
#enq(item) ⇒ CircularQueue
(also: #<<, #push)
Adds an item to the queue.
-
#enq!(item) ⇒ CircularQueue
(also: #push!)
Adds an item to the queue, raising an error if the queue is full.
-
#front ⇒ Object
Returns thee first/oldest item in the queue Peek at first item without removing.
-
#full? ⇒ Boolean
Returns whether the queue is full.
-
#initialize(capacity) ⇒ CircularQueue
constructor
Creates a new queue of the specified capacity.
-
#num_waiting ⇒ Integer
Returns the number of threads waiting for items to arrive in the queue.
Constructor Details
#initialize(capacity) ⇒ CircularQueue
Creates a new queue of the specified capacity
34 35 36 37 38 39 40 41 42 |
# File 'lib/circular_queue.rb', line 34 def initialize(capacity) @capacity = capacity @data = Array.new(capacity) @mutex = Mutex.new @waiting = [] clear end |
Instance Attribute Details
#capacity ⇒ Integer (readonly)
Returns the maximum number of elements that can be enqueued
25 26 27 |
# File 'lib/circular_queue.rb', line 25 def capacity @capacity end |
#size ⇒ Integer (readonly) Also known as: length
Returns the number of elements in the queue
29 30 31 |
# File 'lib/circular_queue.rb', line 29 def size @size end |
Instance Method Details
#back ⇒ Object
Returns the last/most recent item in the queue Peek at last item without removing
128 129 130 131 132 |
# File 'lib/circular_queue.rb', line 128 def back @mutex.synchronize do @data[(@back - 1) % @capacity] end end |
#clear ⇒ CircularQueue
Removes all items from the queue
95 96 97 98 99 100 101 102 |
# File 'lib/circular_queue.rb', line 95 def clear @mutex.synchronize do @size = 0 @front = 0 @back = 0 self end end |
#data ⇒ Array
Returns the data in the queue Allows for easy iteration of queue from front to back
143 144 145 146 147 148 149 150 |
# File 'lib/circular_queue.rb', line 143 def data @mutex.synchronize do @data.clone.tap do |data| data.rotate!(@front) data.slice!(@size..-1) end end end |
#deq(non_block = false) ⇒ Object Also known as: shift, pop
Removes an item from the queue
76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/circular_queue.rb', line 76 def deq(non_block = false) @mutex.synchronize do loop do if empty? raise ThreadError.new("Queue is empty") if non_block @waiting.push(Thread.current) unless @waiting.include?(Thread.current) @mutex.sleep else return deq_item end end end end |
#empty? ⇒ Boolean
Returns whether the queue is empty
106 107 108 |
# File 'lib/circular_queue.rb', line 106 def empty? @size == 0 end |
#enq(item) ⇒ CircularQueue Also known as: <<, push
Adds an item to the queue
47 48 49 50 51 52 53 |
# File 'lib/circular_queue.rb', line 47 def enq(item) @mutex.synchronize do enq_item(item) wakeup_next_waiter self end end |
#enq!(item) ⇒ CircularQueue Also known as: push!
Adds an item to the queue, raising an error if the queue is full
61 62 63 64 65 66 67 68 69 |
# File 'lib/circular_queue.rb', line 61 def enq!(item) @mutex.synchronize do raise ThreadError.new("Queue is full") if full? enq_item(item) wakeup_next_waiter self end end |
#front ⇒ Object
Returns thee first/oldest item in the queue Peek at first item without removing
119 120 121 122 123 |
# File 'lib/circular_queue.rb', line 119 def front @mutex.synchronize do @data[@front] end end |
#full? ⇒ Boolean
Returns whether the queue is full
112 113 114 |
# File 'lib/circular_queue.rb', line 112 def full? @size == @capacity end |
#num_waiting ⇒ Integer
Returns the number of threads waiting for items to arrive in the queue
136 137 138 |
# File 'lib/circular_queue.rb', line 136 def num_waiting @waiting.length end |