Class: MultiProcessing::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/multiprocessing/queue.rb

Overview

This class provides a way to synchronize communication between process.

Queue uses pipes to communicate with other processes. #push starts background thread to write data to the pipe. Avoiding to exit process before writing to the pipe, use #close and #join_thread.

q.close.join_thread

#join_thread waits until all data is written to the pipe.

Note that Queue uses 8 pipes ( 2 pipes, 2 Mutex, 1 Semaphore).

Examples:

require 'multiprocessing'

q = MultiProcessing::Queue.new
fork do
  q.push :nyan
  q.push :wan
  q.close.join_thread
end
q.pop # => :nyan
q.pop # => :wan

Instance Method Summary collapse

Constructor Details

#initializeQueue

Returns a new instance of Queue.



41
42
43
44
45
46
47
48
49
50
# File 'lib/multiprocessing/queue.rb', line 41

def initialize
  @count = Semaphore.new 0
  @write_mutex = Mutex.new
  @read_mutex = Mutex.new
  @len_pout, @len_pin = IO.pipe
  @data_pout, @data_pin = IO.pipe
  @enq_queue = ::Queue.new
  @queue_zero_cond = ::ConditionVariable.new
  @closed = false
end

Instance Method Details

#clearQueue

Removes all objects from the queue

Returns:



58
59
60
61
62
63
64
65
66
# File 'lib/multiprocessing/queue.rb', line 58

def clear
  begin
    loop do
      self.deq(true)
    end
  rescue QueueError
  end
  self
end

#closeQueue

Close the queue. After closing, the queue cannot be pushed any object. #join_thread can call only after closing the queue.

Returns:



179
180
181
182
# File 'lib/multiprocessing/queue.rb', line 179

def close
  @closed = true
  self
end

#deq(non_block = false) ⇒ Object Also known as: pop, shift

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, thread isn’t suspended, and exception is raised.

Parameters:

  • non_block (Boolean) (defaults to: false)

Returns:

  • (Object)


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/multiprocessing/queue.rb', line 99

def deq non_block=false
  data = ""
  @read_mutex.synchronize do
    unless non_block
      @count.wait
    else
      unless @count.try_wait
        raise QueueError.new("Queue is empty")
      end
    end

    buf = ""
    len = nil
    begin
      c = @len_pout.readpartial 1
      if c == "\n"
        len = buf.to_i
      else
        buf << c
      end
    end while !len

    begin
      buf = @data_pout.readpartial len
      len -= buf.bytesize
      data << buf
    end while len > 0
  end
  return Marshal.load(data)

end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


74
75
76
# File 'lib/multiprocessing/queue.rb', line 74

def empty?
  length == 0
end

#enq(obj) ⇒ Queue Also known as: push, <<

Pushes object to the queue. Raise QueueError if the queue is already closed. Raise TypeError if the object passed cannot be dumped with Marshal.

Parameters:

  • obj (Object)

Returns:

Raises:

  • (QueueError)

    the queue is already closed.

  • (TypeError)

    object cannot be dumped with Marshal.



144
145
146
147
148
149
150
151
152
153
# File 'lib/multiprocessing/queue.rb', line 144

def enq obj
  raise QueueError.new("already closed") if @closed
  unless(@enq_thread && @enq_thread.alive?)
    @enq_queue.clear
    @enq_thread = Thread.new &method(:enq_loop)
  end
  @enq_queue.enq(Marshal.dump(obj))
  @count.post
  self
end

#join_threadQueue

Waits until all data is written to the communication pipe. This can call only after closing(#close) queue.

Returns:

Raises:



192
193
194
195
196
197
198
# File 'lib/multiprocessing/queue.rb', line 192

def join_thread
  raise QueueError.new("must be closed before join_thread") unless @closed
  if @enq_thread && @enq_thread.alive?
    @enq_thread.join
  end
  self
end

#lengthFixnum Also known as: size, count

Returns number of items in the queue.

Returns:

  • (Fixnum)


84
85
86
# File 'lib/multiprocessing/queue.rb', line 84

def length
  @count.value
end