Class: Async::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/async/queue.rb

Overview

A thread-safe queue which allows items to be processed in order.

This implementation uses Thread::Queue internally for thread safety while maintaining compatibility with the fiber scheduler.

It has a compatible interface with Notification and Condition, except that it’s multi-value.

Direct Known Subclasses

LimitedQueue

Defined Under Namespace

Classes: ClosedError

Instance Method Summary collapse

Constructor Details

#initialize(parent: nil, delegate: Thread::Queue.new) ⇒ Queue

Create a new thread-safe queue.



31
32
33
34
# File 'lib/async/queue.rb', line 31

def initialize(parent: nil, delegate: Thread::Queue.new)
	@delegate = delegate
	@parent = parent
end

Instance Method Details

#<<(item) ⇒ Object

Compatibility with Queue#push.



69
70
71
# File 'lib/async/queue.rb', line 69

def <<(item)
	self.push(item)
end

#async(parent: (@parent or Task.current), **options, &block) ⇒ Object

Process each item in the queue.



102
103
104
105
106
# File 'lib/async/queue.rb', line 102

def async(parent: (@parent or Task.current), **options, &block)
	while item = self.dequeue
		parent.async(item, **options, &block)
	end
end

#closeObject

Close the queue, causing all waiting tasks to return ‘nil`. Any subsequent calls to #enqueue will raise an exception.



42
43
44
# File 'lib/async/queue.rb', line 42

def close
	@delegate.close
end

#closed?Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/async/queue.rb', line 37

def closed?
	@delegate.closed?
end

#dequeue(timeout: nil) ⇒ Object

Remove and return the next item from the queue.



83
84
85
# File 'lib/async/queue.rb', line 83

def dequeue(timeout: nil)
	@delegate.pop(timeout: timeout)
end

#eachObject

Enumerate each item in the queue.



109
110
111
112
113
# File 'lib/async/queue.rb', line 109

def each
	while item = self.dequeue
		yield item
	end
end

#empty?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/async/queue.rb', line 52

def empty?
	@delegate.empty?
end

#enqueue(*items) ⇒ Object

Add multiple items to the queue.



74
75
76
77
78
# File 'lib/async/queue.rb', line 74

def enqueue(*items)
	items.each {|item| @delegate.push(item)}
rescue ClosedQueueError
	raise ClosedError, "Cannot enqueue items to a closed queue!"
end

#pop(timeout: nil) ⇒ Object

Compatibility with Queue#pop.



90
91
92
# File 'lib/async/queue.rb', line 90

def pop(timeout: nil)
	@delegate.pop(timeout: timeout)
end

#push(item) ⇒ Object

Add an item to the queue.



62
63
64
65
66
# File 'lib/async/queue.rb', line 62

def push(item)
	@delegate.push(item)
rescue ClosedQueueError
	raise ClosedError, "Cannot enqueue items to a closed queue!"
end

#signal(value = nil) ⇒ Object

Signal the queue with a value, the same as #enqueue.



116
117
118
# File 'lib/async/queue.rb', line 116

def signal(value = nil)
	self.enqueue(value)
end

#sizeObject



47
48
49
# File 'lib/async/queue.rb', line 47

def size
	@delegate.size
end

#waitObject

Wait for an item to be available, the same as #dequeue.



121
122
123
# File 'lib/async/queue.rb', line 121

def wait
	self.dequeue
end

#waiting_countObject



57
58
59
# File 'lib/async/queue.rb', line 57

def waiting_count
	@delegate.num_waiting
end