Class: CircularQueue
- Inherits:
-
Object
- Object
- CircularQueue
- Defined in:
- lib/circular_queue.rb
Overview
A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if ‘enq!` is used).
Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.
Exposes the same interface as the ‘Queue` from the Ruby stdlib.
Example:
# Capacity of 3
q = CircularQueue.new(3)
q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]
# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]
Instance Attribute Summary collapse
-
#capacity ⇒ Integer
readonly
Returns the maximum number of elements that can be enqueued.
-
#size ⇒ Integer
(also: #length)
readonly
Returns the number of elements in the queue.
Instance Method Summary collapse
-
#back ⇒ Object
Returns the last/most recent item in the queue Peek at last item without removing.
-
#clear ⇒ Object
Removes all items from the queue.
-
#data ⇒ Array
Returns the data in the queue Allows for easy iteration of queue from front to back.
-
#deq(non_block = false) ⇒ Object
(also: #shift, #pop)
Removes an item from the queue.
-
#empty? ⇒ Boolean
Returns whether the queue is empty.
-
#enq(item) ⇒ Object
(also: #<<, #push)
Adds an item to the queue.
-
#enq!(item) ⇒ Object
(also: #push!)
Adds an item to the queue, raising an error if the queue is full.
-
#front ⇒ Object
Returns thee first/oldest item in the queue Peek at first item without removing.
-
#full? ⇒ Boolean
Returns whether the queue is full.
-
#initialize(capacity) ⇒ CircularQueue
constructor
Creates a new queue of the specified capacity.
-
#num_waiting ⇒ Integer
Returns the number of threads waiting for items to arrive in the queue.
Constructor Details
#initialize(capacity) ⇒ CircularQueue
Creates a new queue of the specified capacity
34 35 36 37 38 39 40 41 42 |
# File 'lib/circular_queue.rb', line 34 def initialize(capacity) @capacity = capacity @data = Array.new(capacity) @mutex = Mutex.new @waiting = [] clear end |
Instance Attribute Details
#capacity ⇒ Integer (readonly)
Returns the maximum number of elements that can be enqueued
25 26 27 |
# File 'lib/circular_queue.rb', line 25 def capacity @capacity end |
#size ⇒ Integer (readonly) Also known as: length
Returns the number of elements in the queue
29 30 31 |
# File 'lib/circular_queue.rb', line 29 def size @size end |
Instance Method Details
#back ⇒ Object
Returns the last/most recent item in the queue Peek at last item without removing
122 123 124 125 126 |
# File 'lib/circular_queue.rb', line 122 def back @mutex.synchronize do @data[(@back - 1) % @capacity] end end |
#clear ⇒ Object
Removes all items from the queue
90 91 92 93 94 95 96 |
# File 'lib/circular_queue.rb', line 90 def clear @mutex.synchronize do @size = 0 @front = 0 @back = 0 end end |
#data ⇒ Array
Returns the data in the queue Allows for easy iteration of queue from front to back
137 138 139 140 141 142 143 144 |
# File 'lib/circular_queue.rb', line 137 def data @mutex.synchronize do @data.clone.tap do |data| data.rotate!(@front) data.slice!(@size..-1) end end end |
#deq(non_block = false) ⇒ Object Also known as: shift, pop
Removes an item from the queue
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/circular_queue.rb', line 72 def deq(non_block = false) @mutex.synchronize do loop do if empty? raise ThreadError.new("Queue is empty") if non_block @waiting.push(Thread.current) unless @waiting.include?(Thread.current) @mutex.sleep else return deq_item end end end end |
#empty? ⇒ Boolean
Returns whether the queue is empty
100 101 102 |
# File 'lib/circular_queue.rb', line 100 def empty? @size == 0 end |
#enq(item) ⇒ Object Also known as: <<, push
Adds an item to the queue
46 47 48 49 50 51 |
# File 'lib/circular_queue.rb', line 46 def enq(item) @mutex.synchronize do enq_item(item) wakeup_next_waiter end end |
#enq!(item) ⇒ Object Also known as: push!
Adds an item to the queue, raising an error if the queue is full
58 59 60 61 62 63 64 65 |
# File 'lib/circular_queue.rb', line 58 def enq!(item) @mutex.synchronize do raise ThreadError.new("Queue is full") if full? enq_item(item) wakeup_next_waiter end end |
#front ⇒ Object
Returns thee first/oldest item in the queue Peek at first item without removing
113 114 115 116 117 |
# File 'lib/circular_queue.rb', line 113 def front @mutex.synchronize do @data[@front] end end |
#full? ⇒ Boolean
Returns whether the queue is full
106 107 108 |
# File 'lib/circular_queue.rb', line 106 def full? @size == @capacity end |
#num_waiting ⇒ Integer
Returns the number of threads waiting for items to arrive in the queue
130 131 132 |
# File 'lib/circular_queue.rb', line 130 def num_waiting @waiting.length end |