Class: ConcurrentQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent_queue.rb,
lib/concurrent_queue/listener.rb

Defined Under Namespace

Classes: Listener

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeConcurrentQueue

Returns a new instance of ConcurrentQueue.



17
18
19
20
21
# File 'lib/concurrent_queue.rb', line 17

def initialize
  @listeners = []
  @mutex = Mutex.new
  @queue = Array.new
end

Class Method Details

.pop(arg) ⇒ Object



7
8
9
10
11
12
13
14
15
# File 'lib/concurrent_queue.rb', line 7

def self.pop(arg)
  arg = [arg] if arg.is_a?(ConcurrentQueue)
  unless arg.is_a?(Array) && arg.all? { |queue| queue.is_a?(ConcurrentQueue) }
    raise ArgumentError, 'must pass array of ConcurrentQueues'
  end
  queues = arg
  listener = Listener.new
  listener.pop(queues)
end

Instance Method Details

#add_listener(listener) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/concurrent_queue.rb', line 39

def add_listener(listener)
  @mutex.synchronize do
    @listeners << listener
    notify if @queue.any?
  end
  nil
end

#lengthObject



35
36
37
# File 'lib/concurrent_queue.rb', line 35

def length
  @mutex.synchronize { @queue.length }
end

#popObject



23
24
25
# File 'lib/concurrent_queue.rb', line 23

def pop
  self.class.pop(self)
end

#push(item) ⇒ Object



27
28
29
30
31
32
33
# File 'lib/concurrent_queue.rb', line 27

def push(item)
  @mutex.synchronize do
    @queue.push(item)
    notify
  end
  nil
end

#remove_listener(listener) ⇒ Object



47
48
49
50
51
52
# File 'lib/concurrent_queue.rb', line 47

def remove_listener(listener)
  @mutex.synchronize do
    @listeners.delete(listener)
  end
  nil
end