Class: MultiProcessing::Queue
- Inherits:
-
Object
- Object
- MultiProcessing::Queue
- Defined in:
- lib/multiprocessing/queue.rb
Overview
This class provides a way to synchronize communication between process.
Queue uses pipes to communicate with other processes. #push starts background thread to write data to the pipe. Avoiding to exit process before writing to the pipe, use #close and #join_thread.
q.close.join_thread
#join_thread waits until all data is written to the pipe.
Note that Queue uses 8 pipes ( 2 pipes, 2 Mutex, 1 Semaphore).
Instance Method Summary collapse
-
#clear ⇒ Queue
Removes all objects from the queue.
-
#close ⇒ Queue
Close the queue.
-
#deq(non_block = false) ⇒ Object
(also: #pop, #shift)
Retrieves data from the queue.
-
#empty? ⇒ Boolean
Returns true if the queue is empty.
-
#enq(obj) ⇒ Queue
(also: #push, #<<)
Pushes object to the queue.
-
#initialize ⇒ Queue
constructor
A new instance of Queue.
-
#join_thread ⇒ Queue
Waits until all data is written to the communication pipe.
-
#length ⇒ Fixnum
(also: #size, #count)
Returns number of items in the queue.
Constructor Details
#initialize ⇒ Queue
Returns a new instance of Queue.
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/multiprocessing/queue.rb', line 41 def initialize @count = Semaphore.new 0 @write_mutex = Mutex.new @read_mutex = Mutex.new @len_pout, @len_pin = IO.pipe @data_pout, @data_pin = IO.pipe @enq_queue = ::Queue.new @queue_zero_cond = ::ConditionVariable.new @closed = false end |
Instance Method Details
#clear ⇒ Queue
Removes all objects from the queue
58 59 60 61 62 63 64 65 66 |
# File 'lib/multiprocessing/queue.rb', line 58 def clear begin loop do self.deq(true) end rescue QueueError end self end |
#close ⇒ Queue
Close the queue. After closing, the queue cannot be pushed any object. #join_thread can call only after closing the queue.
179 180 181 182 |
# File 'lib/multiprocessing/queue.rb', line 179 def close @closed = true self end |
#deq(non_block = false) ⇒ Object Also known as: pop, shift
Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, thread isn’t suspended, and exception is raised.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/multiprocessing/queue.rb', line 99 def deq non_block=false data = "" @read_mutex.synchronize do unless non_block @count.wait else unless @count.try_wait raise QueueError.new("Queue is empty") end end buf = "" len = nil begin c = @len_pout.readpartial 1 if c == "\n" len = buf.to_i else buf << c end end while !len begin buf = @data_pout.readpartial len len -= buf.bytesize data << buf end while len > 0 end return Marshal.load(data) end |
#empty? ⇒ Boolean
Returns true if the queue is empty.
74 75 76 |
# File 'lib/multiprocessing/queue.rb', line 74 def empty? length == 0 end |
#enq(obj) ⇒ Queue Also known as: push, <<
Pushes object to the queue. Raise QueueError if the queue is already closed. Raise TypeError if the object passed cannot be dumped with Marshal.
144 145 146 147 148 149 150 151 152 153 |
# File 'lib/multiprocessing/queue.rb', line 144 def enq obj raise QueueError.new("already closed") if @closed unless(@enq_thread && @enq_thread.alive?) @enq_queue.clear @enq_thread = Thread.new &method(:enq_loop) end @enq_queue.enq(Marshal.dump(obj)) @count.post self end |
#join_thread ⇒ Queue
Waits until all data is written to the communication pipe. This can call only after closing(#close) queue.
192 193 194 195 196 197 198 |
# File 'lib/multiprocessing/queue.rb', line 192 def join_thread raise QueueError.new("must be closed before join_thread") unless @closed if @enq_thread && @enq_thread.alive? @enq_thread.join end self end |
#length ⇒ Fixnum Also known as: size, count
Returns number of items in the queue.
84 85 86 |
# File 'lib/multiprocessing/queue.rb', line 84 def length @count.value end |