Class: MultiOpQueue::Queue
- Inherits:
-
Object
- Object
- MultiOpQueue::Queue
- Defined in:
- lib/multi_op_queue.rb
Overview
Original Queue implementation from Ruby-2.0.0 github.com/ruby/ruby/blob/ruby_2_0_0/lib/thread.rb
This class provides a way to synchronize communication between threads.
Example:
require 'thread'
queue = Queue.new
producer = Thread.new do
5.times do |i|
sleep rand(i) # simulate expense
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
5.times do |i|
value = queue.pop
sleep rand(i/2) # simulate expense
puts "consumed #{value}"
end
end
consumer.join
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#concat(ary) ⇒ Object
Concatenates
ary
onto the queue. -
#empty? ⇒ Boolean
Returns
true
if the queue is empty. -
#initialize ⇒ Queue
constructor
Creates a new queue.
-
#length ⇒ Object
(also: #size)
Returns the length of the queue.
-
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop(non_block = false) ⇒ Object
(also: #shift, #deq)
Retrieves data from the queue.
-
#pop_up_to(num_to_pop = 1, opts = {}) ⇒ Object
Retrieves data from the queue and returns array of contents.
-
#push(obj) ⇒ Object
(also: #<<, #enq)
Pushes
obj
to the queue.
Constructor Details
#initialize ⇒ Queue
Creates a new queue.
37 38 39 40 41 42 43 44 |
# File 'lib/multi_op_queue.rb', line 37 def initialize @que = [] @que.taint # enable tainted communication @num_waiting = 0 self.taint @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
167 168 169 |
# File 'lib/multi_op_queue.rb', line 167 def clear @que.clear end |
#concat(ary) ⇒ Object
Concatenates ary
onto the queue.
49 50 51 52 53 54 55 56 |
# File 'lib/multi_op_queue.rb', line 49 def concat(ary) handle_interrupt do @mutex.synchronize do @que.concat ary @cond.signal end end end |
#empty? ⇒ Boolean
Returns true
if the queue is empty.
160 161 162 |
# File 'lib/multi_op_queue.rb', line 160 def empty? @que.empty? end |
#length ⇒ Object Also known as: size
Returns the length of the queue.
174 175 176 |
# File 'lib/multi_op_queue.rb', line 174 def length @que.length end |
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
186 187 188 |
# File 'lib/multi_op_queue.rb', line 186 def num_waiting @num_waiting end |
#pop(non_block = false) ⇒ Object Also known as: shift, deq
Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and an exception is raised.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/multi_op_queue.rb', line 85 def pop(non_block=false) handle_interrupt do @mutex.synchronize do while true if @que.empty? if non_block raise ThreadError, "queue empty" else begin @num_waiting += 1 @cond.wait @mutex ensure @num_waiting -= 1 end end else return @que.shift end end end end end |
#pop_up_to(num_to_pop = 1, opts = {}) ⇒ Object
Retrieves data from the queue and returns array of contents. If num_to_pop
are available in the queue then multiple elements are returned in array response If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and an exception is raised.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/multi_op_queue.rb', line 125 def pop_up_to(num_to_pop = 1, opts = {}) case opts when TrueClass, FalseClass non_bock = opts when Hash timeout = opts.fetch(:timeout, nil) non_block = opts.fetch(:non_block, false) end handle_interrupt do @mutex.synchronize do while true if @que.empty? if non_block raise ThreadError, "queue empty" else begin @num_waiting += 1 @cond.wait(@mutex, timeout) return nil if @que.empty? ensure @num_waiting -= 1 end end else return @que.shift(num_to_pop) end end end end end |
#push(obj) ⇒ Object Also known as: <<, enq
Pushes obj
to the queue.
61 62 63 64 65 66 67 68 |
# File 'lib/multi_op_queue.rb', line 61 def push(obj) handle_interrupt do @mutex.synchronize do @que.push obj @cond.signal end end end |