Class: ConcurrentQueue
- Inherits:
-
Object
- Object
- ConcurrentQueue
- Defined in:
- lib/concurrent_queue.rb,
lib/concurrent_queue/listener.rb
Defined Under Namespace
Classes: Listener
Class Method Summary collapse
Instance Method Summary collapse
- #add_listener(listener) ⇒ Object
-
#initialize ⇒ ConcurrentQueue
constructor
A new instance of ConcurrentQueue.
- #length ⇒ Object
- #pop ⇒ Object
- #push(item) ⇒ Object
- #remove_listener(listener) ⇒ Object
Constructor Details
#initialize ⇒ ConcurrentQueue
Returns a new instance of ConcurrentQueue.
17 18 19 20 21 |
# File 'lib/concurrent_queue.rb', line 17 def initialize @listeners = [] @mutex = Mutex.new @queue = Array.new end |
Class Method Details
.pop(arg) ⇒ Object
7 8 9 10 11 12 13 14 15 |
# File 'lib/concurrent_queue.rb', line 7 def self.pop(arg) arg = [arg] if arg.is_a?(ConcurrentQueue) unless arg.is_a?(Array) && arg.all? { |queue| queue.is_a?(ConcurrentQueue) } raise ArgumentError, 'must pass array of ConcurrentQueues' end queues = arg listener = Listener.new listener.pop(queues) end |
Instance Method Details
#add_listener(listener) ⇒ Object
39 40 41 42 43 44 45 |
# File 'lib/concurrent_queue.rb', line 39 def add_listener(listener) @mutex.synchronize do @listeners << listener notify if @queue.any? end nil end |
#length ⇒ Object
35 36 37 |
# File 'lib/concurrent_queue.rb', line 35 def length @mutex.synchronize { @queue.length } end |
#pop ⇒ Object
23 24 25 |
# File 'lib/concurrent_queue.rb', line 23 def pop self.class.pop(self) end |
#push(item) ⇒ Object
27 28 29 30 31 32 33 |
# File 'lib/concurrent_queue.rb', line 27 def push(item) @mutex.synchronize do @queue.push(item) notify end nil end |
#remove_listener(listener) ⇒ Object
47 48 49 50 51 52 |
# File 'lib/concurrent_queue.rb', line 47 def remove_listener(listener) @mutex.synchronize do @listeners.delete(listener) end nil end |