Class: CloseableQueue
- Inherits:
-
Object
- Object
- CloseableQueue
- Defined in:
- lib/closeable_queue.rb,
lib/closeable_queue/version.rb
Overview
A wrapper around Queue to provide support for ‘#close`.
Once closed, threads waiting on dequeue will drain the queue and then receive nil on future dequeues. If close(true) is used, pop from an empty closed queue or attempts to push raise ClosedQueue, a subclass of StopIteration.
Example usage:
queue = ClosableQueue.new
consumer = Thread.new { while number = queue.pop ; puts number ; end }
5.times {|x| queue.push(x) }
queue.close
consumer.join
‘#close` is thread-safe and can be called safely multiple times.
This is anticipated to be obsolete by Ruby 2.3 with Queue#close.
Defined Under Namespace
Classes: ClosedQueue, ClosedQueueError
Constant Summary collapse
- VERSION =
"0.1.1"
Instance Method Summary collapse
-
#close(raise_exception = false) ⇒ Object
Close the queue if it hasn’t been already.
-
#closed? ⇒ Boolean
Return true if the queue has been closed.
-
#empty? ⇒ Boolean
Return true if the queue is empty.
-
#initialize(limit = nil) ⇒ CloseableQueue
constructor
Set up a new queue.
- #inspect ⇒ Object
-
#length ⇒ Object
Get the number of items remaining on the queue.
-
#num_waiting ⇒ Object
Get an atomic snapshot if the number of threads waiting on the queue.
-
#pop ⇒ Object
Take the first element off the queue.
-
#push(item) ⇒ Object
Add an item to the queue and wakeup any sleeping consumers.
Constructor Details
#initialize(limit = nil) ⇒ CloseableQueue
Set up a new queue. limit will use a SizedQueue, default unbounded Queue.
32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/closeable_queue.rb', line 32 def initialize(limit = nil) @mutex = Mutex.new @waiting = Set.new @num_waiting = Concurrent::AtomicFixnum.new @closed = Concurrent::AtomicBoolean.new @raise_exception = Concurrent::AtomicBoolean.new(false) if limit @queue = SizedQueue.new(Integer(limit)) else @queue = Queue.new end end |
Instance Method Details
#close(raise_exception = false) ⇒ Object
Close the queue if it hasn’t been already. Wake up waiting threads if any.
102 103 104 105 106 |
# File 'lib/closeable_queue.rb', line 102 def close(raise_exception = false) @raise_exception.make_true if raise_exception @mutex.synchronize { @waiting.each(&:wakeup) } if @closed.make_true self end |
#closed? ⇒ Boolean
Return true if the queue has been closed
97 98 99 |
# File 'lib/closeable_queue.rb', line 97 def closed? @closed.true? end |
#empty? ⇒ Boolean
Return true if the queue is empty
92 93 94 |
# File 'lib/closeable_queue.rb', line 92 def empty? @queue.empty? end |
#inspect ⇒ Object
46 47 48 |
# File 'lib/closeable_queue.rb', line 46 def inspect "#<#{self.class.name} size=#{length} closed=#{closed?} waiting=#{num_waiting}>" end |
#length ⇒ Object
Get the number of items remaining on the queue
87 88 89 |
# File 'lib/closeable_queue.rb', line 87 def length @queue.length end |
#num_waiting ⇒ Object
Get an atomic snapshot if the number of threads waiting on the queue.
82 83 84 |
# File 'lib/closeable_queue.rb', line 82 def num_waiting @num_waiting.value end |
#pop ⇒ Object
Take the first element off the queue.
If the queue is empty and closed?, return nil, or optionally raise ClosedQueue (a subclass of StopIteration)
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/closeable_queue.rb', line 54 def pop @queue.pop(true) rescue ThreadError if closed? raise ClosedQueue if @raise_exception.true? return nil else sleep retry end end |
#push(item) ⇒ Object
Add an item to the queue and wakeup any sleeping consumers.
If the queue is closed, raises ClosedQueueError.
71 72 73 74 75 76 77 |
# File 'lib/closeable_queue.rb', line 71 def push(item) fail ClosedQueueError if closed? @queue.push(item) wakeup self end |