Class: CircularQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/circular_queue.rb

Overview

A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if enq! is used).

Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.

Exposes the same interface as the Queue from the Ruby stdlib.

Example:

# Capacity of 3
q = CircularQueue.new(3)

q << 1 # => [1]
q << 2 # => [1, 2]
q << 3 # => [1, 2, 3]

# Elements are replaced when the queue reaches capacity
q << 4 # => [2, 3, 4]
q << 5 # => [3, 4, 5]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capacity) ⇒ CircularQueue

Creates a new queue of the specified capacity

Parameters:

  • capacity (Integer)

    the maximum capacity of the queue



34
35
36
37
38
39
40
41
42
# File 'lib/circular_queue.rb', line 34

def initialize(capacity)
  @capacity = capacity
  @data     = Array.new(capacity)

  @mutex    = Mutex.new
  @waiting  = []

  clear
end

Instance Attribute Details

#capacityInteger (readonly)

Returns the maximum number of elements that can be enqueued

Returns:

  • (Integer)


25
26
27
# File 'lib/circular_queue.rb', line 25

def capacity
  @capacity
end

#sizeInteger (readonly) Also known as: length

Returns the number of elements in the queue

Returns:

  • (Integer)


29
30
31
# File 'lib/circular_queue.rb', line 29

def size
  @size
end

Instance Method Details

#backObject

Returns the last/most recent item in the queue Peek at last item without removing

Returns:

  • (Object)


128
129
130
131
132
# File 'lib/circular_queue.rb', line 128

def back
  @mutex.synchronize do
    @data[(@back - 1) % @capacity]
  end
end

#clearCircularQueue

Removes all items from the queue

Returns:



95
96
97
98
99
100
101
102
# File 'lib/circular_queue.rb', line 95

def clear
  @mutex.synchronize do
    @size  = 0
    @front = 0
    @back  = 0
    self
  end
end

#dataArray

Returns the data in the queue Allows for easy iteration of queue from front to back

Returns:

  • (Array)

    the queue



143
144
145
146
147
148
149
150
# File 'lib/circular_queue.rb', line 143

def data
  @mutex.synchronize do
    @data.clone.tap do |data|
      data.rotate!(@front)
      data.slice!(@size..-1)
    end
  end
end

#deq(non_block = false) ⇒ Object Also known as: shift, pop

Removes an item from the queue

Parameters:

  • non_block (Boolean) (defaults to: false)

    true to raise an error if the queue is empty; otherwise, waits for an item to arrive from another thread

Raises:

  • (ThreadError)

    non_block was true and the queue was empty



76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/circular_queue.rb', line 76

def deq(non_block = false)
  @mutex.synchronize do
    loop do
      if empty?
        raise ThreadError.new("Queue is empty") if non_block

        @waiting.push(Thread.current) unless @waiting.include?(Thread.current)
        @mutex.sleep
      else
        return deq_item
      end
    end
  end
end

#empty?Boolean

Returns whether the queue is empty

Returns:

  • (Boolean)

    queue is empty



106
107
108
# File 'lib/circular_queue.rb', line 106

def empty?
  @size == 0
end

#enq(item) ⇒ CircularQueue Also known as: <<, push

Adds an item to the queue

Parameters:

  • item (Object)

    item to add

Returns:



47
48
49
50
51
52
53
# File 'lib/circular_queue.rb', line 47

def enq(item)
  @mutex.synchronize do
    enq_item(item)
    wakeup_next_waiter
    self
  end
end

#enq!(item) ⇒ CircularQueue Also known as: push!

Adds an item to the queue, raising an error if the queue is full

Parameters:

  • item (Object)

    item to add

Returns:

Raises:

  • (ThreadError)

    queue is full



61
62
63
64
65
66
67
68
69
# File 'lib/circular_queue.rb', line 61

def enq!(item)
  @mutex.synchronize do
    raise ThreadError.new("Queue is full") if full?

    enq_item(item)
    wakeup_next_waiter
    self
  end
end

#frontObject

Returns thee first/oldest item in the queue Peek at first item without removing

Returns:

  • (Object)


119
120
121
122
123
# File 'lib/circular_queue.rb', line 119

def front
  @mutex.synchronize do
    @data[@front]
  end
end

#full?Boolean

Returns whether the queue is full

Returns:

  • (Boolean)

    queue is full



112
113
114
# File 'lib/circular_queue.rb', line 112

def full?
  @size == @capacity
end

#num_waitingInteger

Returns the number of threads waiting for items to arrive in the queue

Returns:

  • (Integer)

    number of threads waiting



136
137
138
# File 'lib/circular_queue.rb', line 136

def num_waiting
  @waiting.length
end