Class: CircularQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/circular_queue.rb

Overview

A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if ‘enq!` is used).

Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.

Exposes the same interface as the ‘Queue` from the Ruby stdlib.

Example:

# Capacity of 3
q = CircularQueue.new(3)

q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]

# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity) ⇒ CircularQueue

Creates a new queue of the specified capacity



34
35
36
37
38
39
40
41
42
# File 'lib/circular_queue.rb', line 34

def initialize(capacity)
  @capacity = capacity
  @data     = Array.new(capacity)

  @mutex    = Mutex.new
  @waiting  = []

  clear
end

Instance Attribute Details

#capacityInteger (readonly)

Returns the maximum number of elements that can be enqueued



25
26
27
# File 'lib/circular_queue.rb', line 25

def capacity
  @capacity
end

#sizeInteger (readonly) Also known as: length

Returns the number of elements in the queue



29
30
31
# File 'lib/circular_queue.rb', line 29

def size
  @size
end

Instance Method Details

#backObject

Returns the last/most recent item in the queue Peek at last item without removing



128
129
130
131
132
# File 'lib/circular_queue.rb', line 128

def back
  @mutex.synchronize do
    @data[(@back - 1) % @capacity]
  end
end

#clearCircularQueue

Removes all items from the queue



95
96
97
98
99
100
101
102
# File 'lib/circular_queue.rb', line 95

def clear
  @mutex.synchronize do
    @size  = 0
    @front = 0
    @back  = 0
    self
  end
end

#dataArray

Returns the data in the queue Allows for easy iteration of queue from front to back



143
144
145
146
147
148
149
150
# File 'lib/circular_queue.rb', line 143

def data
  @mutex.synchronize do
    @data.clone.tap do |data|
      data.rotate!(@front)
      data.slice!(@size..-1)
    end
  end
end

#deq(non_block = false) ⇒ Object Also known as: shift, pop

Removes an item from the queue

Raises:

  • (ThreadError)

    non_block was true and the queue was empty



76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/circular_queue.rb', line 76

def deq(non_block = false)
  @mutex.synchronize do
    loop do
      if empty?
        raise ThreadError.new("Queue is empty") if non_block

        @waiting.push(Thread.current) unless @waiting.include?(Thread.current)
        @mutex.sleep
      else
        return deq_item
      end
    end
  end
end

#empty?Boolean

Returns whether the queue is empty



106
107
108
# File 'lib/circular_queue.rb', line 106

def empty?
  @size == 0
end

#enq(item) ⇒ CircularQueue Also known as: <<, push

Adds an item to the queue



47
48
49
50
51
52
53
# File 'lib/circular_queue.rb', line 47

def enq(item)
  @mutex.synchronize do
    enq_item(item)
    wakeup_next_waiter
    self
  end
end

#enq!(item) ⇒ CircularQueue Also known as: push!

Adds an item to the queue, raising an error if the queue is full

Raises:

  • (ThreadError)

    queue is full



61
62
63
64
65
66
67
68
69
# File 'lib/circular_queue.rb', line 61

def enq!(item)
  @mutex.synchronize do
    raise ThreadError.new("Queue is full") if full?

    enq_item(item)
    wakeup_next_waiter
    self
  end
end

#frontObject

Returns thee first/oldest item in the queue Peek at first item without removing



119
120
121
122
123
# File 'lib/circular_queue.rb', line 119

def front
  @mutex.synchronize do
    @data[@front]
  end
end

#full?Boolean

Returns whether the queue is full



112
113
114
# File 'lib/circular_queue.rb', line 112

def full?
  @size == @capacity
end

#num_waitingInteger

Returns the number of threads waiting for items to arrive in the queue



136
137
138
# File 'lib/circular_queue.rb', line 136

def num_waiting
  @waiting.length
end