Class: CircularQueue
- Inherits:
-
Object
- Object
- CircularQueue
- Defined in:
- lib/circular_queue.rb
Overview
A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if ‘enq!` is used).
Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.
Exposes the same interface as the ‘Queue` from the Ruby stdlib.
Example:
# Capacity of 3
q = CircularQueue.new(3)
q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]
# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]
Instance Attribute Summary collapse
-
#capacity ⇒ Integer
readonly
Returns the maximum number of elements that can be enqueued.
-
#size ⇒ Integer
(also: #length)
readonly
Returns the number of elements in the queue.
Instance Method Summary collapse
-
#back ⇒ Object
Returns the last/most recent item in the queue Peek at last item without removing.
-
#clear ⇒ Object
Removes all items from the queue.
-
#data ⇒ Array
Returns the data in the queue Allows for easy iteration of queue from front to back.
-
#deq(non_block = false) ⇒ Object
(also: #shift, #pop)
Removes an item from the queue.
-
#empty? ⇒ Boolean
Returns whether the queue is empty.
-
#enq(item) ⇒ Object
(also: #<<, #push)
Adds an item to the queue.
-
#enq!(item) ⇒ Object
(also: #push!)
Adds an item to the queue, raising an error if the queue is full.
-
#front ⇒ Object
Returns thee first/oldest item in the queue Peek at first item without removing.
-
#full? ⇒ Boolean
Returns whether the queue is full.
-
#initialize(capacity) ⇒ CircularQueue
constructor
Creates a new queue of the specified capacity.
-
#num_waiting ⇒ Integer
Returns the number of threads waiting for items to arrive in the queue.
Constructor Details
#initialize(capacity) ⇒ CircularQueue
Creates a new queue of the specified capacity
36 37 38 39 40 41 42 43 44 |
# File 'lib/circular_queue.rb', line 36 def initialize(capacity) @capacity = capacity @data = Array.new(capacity) @mutex = Mutex.new @waiting = Array.new clear end |
Instance Attribute Details
#capacity ⇒ Integer (readonly)
Returns the maximum number of elements that can be enqueued
27 28 29 |
# File 'lib/circular_queue.rb', line 27 def capacity @capacity end |
#size ⇒ Integer (readonly) Also known as: length
Returns the number of elements in the queue
31 32 33 |
# File 'lib/circular_queue.rb', line 31 def size @size end |
Instance Method Details
#back ⇒ Object
Returns the last/most recent item in the queue Peek at last item without removing
124 125 126 127 128 |
# File 'lib/circular_queue.rb', line 124 def back @mutex.synchronize do @data[(@back - 1) % @capacity] end end |
#clear ⇒ Object
Removes all items from the queue
92 93 94 95 96 97 98 |
# File 'lib/circular_queue.rb', line 92 def clear @mutex.synchronize do @size = 0 @front = 0 @back = 0 end end |
#data ⇒ Array
Returns the data in the queue Allows for easy iteration of queue from front to back
139 140 141 142 143 |
# File 'lib/circular_queue.rb', line 139 def data @mutex.synchronize do @data.clone.rotate @front end end |
#deq(non_block = false) ⇒ Object Also known as: shift, pop
Removes an item from the queue
74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/circular_queue.rb', line 74 def deq(non_block = false) @mutex.synchronize do while true if empty? raise ThreadError.new("Queue is empty") if non_block @waiting.push(Thread.current) unless @waiting.include?(Thread.current) @mutex.sleep else return deq_item end end end end |
#empty? ⇒ Boolean
Returns whether the queue is empty
102 103 104 |
# File 'lib/circular_queue.rb', line 102 def empty? @size == 0 end |
#enq(item) ⇒ Object Also known as: <<, push
Adds an item to the queue
48 49 50 51 52 53 |
# File 'lib/circular_queue.rb', line 48 def enq(item) @mutex.synchronize do enq_item(item) wakeup_next_waiter end end |
#enq!(item) ⇒ Object Also known as: push!
Adds an item to the queue, raising an error if the queue is full
60 61 62 63 64 65 66 67 |
# File 'lib/circular_queue.rb', line 60 def enq!(item) @mutex.synchronize do raise ThreadError.new("Queue is full") if full? enq_item(item) wakeup_next_waiter end end |
#front ⇒ Object
Returns thee first/oldest item in the queue Peek at first item without removing
115 116 117 118 119 |
# File 'lib/circular_queue.rb', line 115 def front @mutex.synchronize do @data[@front] end end |
#full? ⇒ Boolean
Returns whether the queue is full
108 109 110 |
# File 'lib/circular_queue.rb', line 108 def full? @size == @capacity end |
#num_waiting ⇒ Integer
Returns the number of threads waiting for items to arrive in the queue
132 133 134 |
# File 'lib/circular_queue.rb', line 132 def num_waiting @waiting.length end |