Class: CircularQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/circular_queue.rb

Overview

A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if `enq!` is used).

Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.

Exposes the same interface as the `Queue` from the Ruby stdlib.

Example:

# Capacity of 3
q = CircularQueue.new(3)

q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]

# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity) ⇒ CircularQueue

Creates a new queue of the specified capacity



36
37
38
39
40
41
42
43
44
# File 'lib/circular_queue.rb', line 36

def initialize(capacity)
  @capacity = capacity
  @data     = Array.new(capacity)

  @mutex    = Mutex.new
  @waiting  = Array.new

  clear
end

Instance Attribute Details

#capacityInteger (readonly)

Returns the maximum number of elements that can be enqueued



27
28
29
# File 'lib/circular_queue.rb', line 27

def capacity
  @capacity
end

#sizeInteger (readonly) Also known as: length

Returns the number of elements in the queue



31
32
33
# File 'lib/circular_queue.rb', line 31

def size
  @size
end

Instance Method Details

#backObject

Returns the last/most recent item in the queue Peek at last item without removing



124
125
126
127
128
# File 'lib/circular_queue.rb', line 124

def back
  @mutex.synchronize do
    @data[(@back - 1) % @capacity]
  end
end

#clearObject

Removes all items from the queue



92
93
94
95
96
97
98
# File 'lib/circular_queue.rb', line 92

def clear
  @mutex.synchronize do
    @size  = 0
    @front = 0
    @back  = 0
  end
end

#dataArray

Returns the data in the queue Allows for easy iteration of queue from front to back



139
140
141
142
143
144
145
146
# File 'lib/circular_queue.rb', line 139

def data
  @mutex.synchronize do
    @data.clone.tap do |data|
      data.rotate!(@front)
      data.slice!(@size..-1)
    end
  end
end

#deq(non_block = false) ⇒ Object Also known as: shift, pop

Removes an item from the queue

Raises:

  • (ThreadError)

    non_block was true and the queue was empty



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/circular_queue.rb', line 74

def deq(non_block = false)
  @mutex.synchronize do
    while true
      if empty?
        raise ThreadError.new("Queue is empty") if non_block

        @waiting.push(Thread.current) unless @waiting.include?(Thread.current)
        @mutex.sleep
      else
        return deq_item
      end
    end
  end
end

#empty?Boolean

Returns whether the queue is empty



102
103
104
# File 'lib/circular_queue.rb', line 102

def empty?
  @size == 0
end

#enq(item) ⇒ Object Also known as: <<, push

Adds an item to the queue



48
49
50
51
52
53
# File 'lib/circular_queue.rb', line 48

def enq(item)
  @mutex.synchronize do
    enq_item(item)
    wakeup_next_waiter
  end
end

#enq!(item) ⇒ Object Also known as: push!

Adds an item to the queue, raising an error if the queue is full

Raises:

  • (ThreadError)

    queue is full



60
61
62
63
64
65
66
67
# File 'lib/circular_queue.rb', line 60

def enq!(item)
  @mutex.synchronize do
    raise ThreadError.new("Queue is full") if full?

    enq_item(item)
    wakeup_next_waiter
  end
end

#frontObject

Returns thee first/oldest item in the queue Peek at first item without removing



115
116
117
118
119
# File 'lib/circular_queue.rb', line 115

def front
  @mutex.synchronize do
    @data[@front]
  end
end

#full?Boolean

Returns whether the queue is full



108
109
110
# File 'lib/circular_queue.rb', line 108

def full?
  @size == @capacity
end

#num_waitingInteger

Returns the number of threads waiting for items to arrive in the queue



132
133
134
# File 'lib/circular_queue.rb', line 132

def num_waiting
  @waiting.length
end