Class: Async::Queue
- Inherits:
-
Object
- Object
- Async::Queue
- Defined in:
- lib/async/queue.rb
Overview
A thread-safe queue which allows items to be processed in order.
This implementation uses Thread::Queue internally for thread safety while maintaining compatibility with the fiber scheduler.
It has a compatible interface with Notification and Condition, except that it’s multi-value.
Direct Known Subclasses
Defined Under Namespace
Classes: ClosedError
Instance Method Summary collapse
-
#<<(item) ⇒ Object
Compatibility with Queue#push.
-
#async(parent: (@parent or Task.current), **options, &block) ⇒ Object
Process each item in the queue.
-
#close ⇒ Object
Close the queue, causing all waiting tasks to return ‘nil`.
- #closed? ⇒ Boolean
-
#dequeue(timeout: nil) ⇒ Object
Remove and return the next item from the queue.
-
#each ⇒ Object
Enumerate each item in the queue.
- #empty? ⇒ Boolean
-
#enqueue(*items) ⇒ Object
Add multiple items to the queue.
-
#initialize(parent: nil, delegate: Thread::Queue.new) ⇒ Queue
constructor
Create a new thread-safe queue.
-
#pop(timeout: nil) ⇒ Object
Compatibility with Queue#pop.
-
#push(item) ⇒ Object
Add an item to the queue.
-
#signal(value = nil) ⇒ Object
Signal the queue with a value, the same as #enqueue.
- #size ⇒ Object
-
#wait ⇒ Object
Wait for an item to be available, the same as #dequeue.
- #waiting_count ⇒ Object
Constructor Details
#initialize(parent: nil, delegate: Thread::Queue.new) ⇒ Queue
Create a new thread-safe queue.
31 32 33 34 |
# File 'lib/async/queue.rb', line 31 def initialize(parent: nil, delegate: Thread::Queue.new) @delegate = delegate @parent = parent end |
Instance Method Details
#<<(item) ⇒ Object
Compatibility with Queue#push.
69 70 71 |
# File 'lib/async/queue.rb', line 69 def <<(item) self.push(item) end |
#async(parent: (@parent or Task.current), **options, &block) ⇒ Object
Process each item in the queue.
102 103 104 105 106 |
# File 'lib/async/queue.rb', line 102 def async(parent: (@parent or Task.current), **, &block) while item = self.dequeue parent.async(item, **, &block) end end |
#close ⇒ Object
Close the queue, causing all waiting tasks to return ‘nil`. Any subsequent calls to #enqueue will raise an exception.
42 43 44 |
# File 'lib/async/queue.rb', line 42 def close @delegate.close end |
#closed? ⇒ Boolean
37 38 39 |
# File 'lib/async/queue.rb', line 37 def closed? @delegate.closed? end |
#dequeue(timeout: nil) ⇒ Object
Remove and return the next item from the queue.
83 84 85 |
# File 'lib/async/queue.rb', line 83 def dequeue(timeout: nil) @delegate.pop(timeout: timeout) end |
#each ⇒ Object
Enumerate each item in the queue.
109 110 111 112 113 |
# File 'lib/async/queue.rb', line 109 def each while item = self.dequeue yield item end end |
#empty? ⇒ Boolean
52 53 54 |
# File 'lib/async/queue.rb', line 52 def empty? @delegate.empty? end |
#enqueue(*items) ⇒ Object
Add multiple items to the queue.
74 75 76 77 78 |
# File 'lib/async/queue.rb', line 74 def enqueue(*items) items.each {|item| @delegate.push(item)} rescue ClosedQueueError raise ClosedError, "Cannot enqueue items to a closed queue!" end |
#pop(timeout: nil) ⇒ Object
Compatibility with Queue#pop.
90 91 92 |
# File 'lib/async/queue.rb', line 90 def pop(timeout: nil) @delegate.pop(timeout: timeout) end |
#push(item) ⇒ Object
Add an item to the queue.
62 63 64 65 66 |
# File 'lib/async/queue.rb', line 62 def push(item) @delegate.push(item) rescue ClosedQueueError raise ClosedError, "Cannot enqueue items to a closed queue!" end |
#signal(value = nil) ⇒ Object
Signal the queue with a value, the same as #enqueue.
116 117 118 |
# File 'lib/async/queue.rb', line 116 def signal(value = nil) self.enqueue(value) end |
#size ⇒ Object
47 48 49 |
# File 'lib/async/queue.rb', line 47 def size @delegate.size end |
#wait ⇒ Object
Wait for an item to be available, the same as #dequeue.
121 122 123 |
# File 'lib/async/queue.rb', line 121 def wait self.dequeue end |
#waiting_count ⇒ Object
57 58 59 |
# File 'lib/async/queue.rb', line 57 def waiting_count @delegate.num_waiting end |