Class: CloseableQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/closeable_queue.rb,
lib/closeable_queue/version.rb

Overview

A wrapper around Queue to provide support for ‘#close`.

Once closed, threads waiting on dequeue will drain the queue and then receive nil on future dequeues. If close(true) is used, pop from an empty closed queue or attempts to push raise ClosedQueue, a subclass of StopIteration.

Example usage:

 queue = ClosableQueue.new
 consumer = Thread.new { while number = queue.pop ; puts number ; end }
 5.times {|x| queue.push(x) }
 queue.close
 consumer.join

‘#close` is thread-safe and can be called safely multiple times.

This is anticipated to be obsolete by Ruby 2.3 with Queue#close.

Defined Under Namespace

Classes: ClosedQueue, ClosedQueueError

Constant Summary collapse

VERSION =
"0.1.1"

Instance Method Summary collapse

Constructor Details

#initialize(limit = nil) ⇒ CloseableQueue

Set up a new queue. limit will use a SizedQueue, default unbounded Queue.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/closeable_queue.rb', line 32

def initialize(limit = nil)
  @mutex           = Mutex.new
  @waiting         = Set.new
  @num_waiting     = Concurrent::AtomicFixnum.new
  @closed          = Concurrent::AtomicBoolean.new
  @raise_exception = Concurrent::AtomicBoolean.new(false)

  if limit
    @queue = SizedQueue.new(Integer(limit))
  else
    @queue = Queue.new
  end
end

Instance Method Details

#close(raise_exception = false) ⇒ Object

Close the queue if it hasn’t been already. Wake up waiting threads if any.



102
103
104
105
106
# File 'lib/closeable_queue.rb', line 102

def close(raise_exception = false)
  @raise_exception.make_true if raise_exception
  @mutex.synchronize { @waiting.each(&:wakeup) } if @closed.make_true
  self
end

#closed?Boolean

Return true if the queue has been closed

Returns:

  • (Boolean)


97
98
99
# File 'lib/closeable_queue.rb', line 97

def closed?
  @closed.true?
end

#empty?Boolean

Return true if the queue is empty

Returns:

  • (Boolean)


92
93
94
# File 'lib/closeable_queue.rb', line 92

def empty?
  @queue.empty?
end

#inspectObject



46
47
48
# File 'lib/closeable_queue.rb', line 46

def inspect
  "#<#{self.class.name} size=#{length} closed=#{closed?} waiting=#{num_waiting}>"
end

#lengthObject

Get the number of items remaining on the queue



87
88
89
# File 'lib/closeable_queue.rb', line 87

def length
  @queue.length
end

#num_waitingObject

Get an atomic snapshot if the number of threads waiting on the queue.



82
83
84
# File 'lib/closeable_queue.rb', line 82

def num_waiting
  @num_waiting.value
end

#popObject

Take the first element off the queue.

If the queue is empty and closed?, return nil, or optionally raise ClosedQueue (a subclass of StopIteration)



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/closeable_queue.rb', line 54

def pop
  @queue.pop(true)
rescue ThreadError
  if closed?
    raise ClosedQueue if @raise_exception.true?
    return nil
  else
    sleep
    retry
  end
end

#push(item) ⇒ Object

Add an item to the queue and wakeup any sleeping consumers.

If the queue is closed, raises ClosedQueueError.



71
72
73
74
75
76
77
# File 'lib/closeable_queue.rb', line 71

def push(item)
  fail ClosedQueueError if closed?

  @queue.push(item)
  wakeup
  self
end