Class: QuackConcurrency::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/quack_concurrency/queue.rb

Overview

This is a duck type for ::Thread::Queue. It is intended to be a drop in replacement for it’s core counterpart. Valuable if ::Thread::Queue has not been implemented.

Instance Method Summary collapse

Constructor Details

#initializeQueue

Creates a new QuackConcurrency::Queue concurrency tool.



10
11
12
13
14
15
16
# File 'lib/quack_concurrency/queue.rb', line 10

def initialize
  @closed = false
  @items = []
  @mutex = ::Mutex.new
  @pop_mutex = Mutex.new
  @waiter = Waiter.new
end

Instance Method Details

#clearself

Removes all objects from it.

Returns:

  • (self)


20
21
22
23
# File 'lib/quack_concurrency/queue.rb', line 20

def clear
  @mutex.synchronize { @items.clear }
  self
end

#closeself

Closes it. Once closed, it cannot be re-opened. After the call to close completes, the following are true:

  • #closed? will return true.

  • #close will be ignored.

  • #push will raise an exception.

  • until empty, calling #pop will return an object from it as usual.

Returns:

  • (self)


33
34
35
36
37
38
39
40
# File 'lib/quack_concurrency/queue.rb', line 33

def close
  @mutex.synchronize do
    return if closed?
    @closed = true
    @waiter.resume_all
  end
  self
end

#closed?Boolean

Checks if it is closed.

Returns:

  • (Boolean)


44
45
46
# File 'lib/quack_concurrency/queue.rb', line 44

def closed?
  @closed
end

#empty?Boolean

Checks if it is empty.

Returns:

  • (Boolean)


50
51
52
# File 'lib/quack_concurrency/queue.rb', line 50

def empty?
  @items.empty?
end

#lengthInteger Also known as: size

Returns the length of it.

Returns:

  • (Integer)


56
57
58
# File 'lib/quack_concurrency/queue.rb', line 56

def length
  @items.length
end

#num_waitingInteger

Returns the number of threads waiting on it.

Returns:

  • (Integer)


63
64
65
# File 'lib/quack_concurrency/queue.rb', line 63

def num_waiting
  @pop_mutex.waiting_threads_count + @waiter.waiting_threads_count
end

#pop(non_block = false) ⇒ Object Also known as: deq, shift

Note:

If it is empty, the method will block until an item is available.

Retrieves an item from it. If non_block is true, a ThreadError will be raised.

Parameters:

  • non_block (Boolean) (defaults to: false)

Returns:

  • (Object)

Raises:

  • (ThreadError)

    if it is empty and non_block is true



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/quack_concurrency/queue.rb', line 73

def pop(non_block = false)
  @pop_mutex.lock do
    @mutex.synchronize do
      if empty?
        return if closed?
        raise ThreadError if non_block
        @mutex.unlock
        @waiter.wait
        @mutex.lock
        return if closed?
      end
      @items.shift
    end
  end
end

#push(item = nil) ⇒ self Also known as: <<, enq

Pushes the given object to it.

Parameters:

  • item (Object) (defaults to: nil)

Returns:

  • (self)


94
95
96
97
98
99
100
101
# File 'lib/quack_concurrency/queue.rb', line 94

def push(item = nil)
  @mutex.synchronize do
    raise ClosedQueueError if closed?
    @items.push(item)
    @waiter.resume_next
  end
  self
end