Class: Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/thread.rb,
thread.c,
thread.c

Overview

This class provides a way to synchronize communication between threads.

Example:

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

Direct Known Subclasses

SizedQueue

Instance Method Summary collapse

Constructor Details

#initializeQueue

Creates a new queue.



266
267
268
269
270
271
272
# File 'lib/thread.rb', line 266

def initialize
  @que = []
  @waiting = []
  @que.taint		# enable tainted comunication
  @waiting.taint
  self.taint
end

Instance Method Details

#clearObject

Removes all objects from the queue.



340
341
342
# File 'lib/thread.rb', line 340

def clear
  @que.clear
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


333
334
335
# File 'lib/thread.rb', line 333

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



347
348
349
# File 'lib/thread.rb', line 347

def length
  @que.length
end

#marshal_dumpObject



851
852
853
854
855
856
857
858
859
860
861
# File 'thread.c', line 851

static VALUE
rb_queue_marshal_dump(VALUE self)
{
    Queue *queue;
    VALUE array;
    Data_Get_Struct(self, Queue, queue);

    array = array_from_list(&queue->values);
    rb_ary_unshift(array, ULONG2NUM(queue->capacity));
    return rb_marshal_dump(array, Qnil);
}

#marshal_load(data) ⇒ Object



831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
# File 'thread.c', line 831

static VALUE
rb_queue_marshal_load(VALUE self, VALUE data)
{
    Queue *queue;
    VALUE array;
    Data_Get_Struct(self, Queue, queue);

    array = rb_marshal_load(data);
    if (TYPE(array) != T_ARRAY) {
	rb_raise(rb_eTypeError, "expected Array of queue data");
    }
    if (RARRAY(array)->len < 1) {
	rb_raise(rb_eArgError, "missing capacity value");
    }
    queue->capacity = NUM2ULONG(rb_ary_shift(array));
    push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);

    return self;
}

#num_waitingObject

Returns the number of threads waiting on the queue.



359
360
361
# File 'lib/thread.rb', line 359

def num_waiting
  @waiting.size
end

#pop(*args) ⇒ Object Also known as: shift, deq

call_seq: pop(non_block=false)

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



309
310
311
312
313
314
315
316
317
318
# File 'lib/thread.rb', line 309

def pop(non_block=false)
  while (Thread.critical = true; @que.empty?)
    raise ThreadError, "queue empty" if non_block
    @waiting.push Thread.current
    Thread.stop
  end
  @que.shift
ensure
  Thread.critical = false
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue.



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/thread.rb', line 277

def push(obj)
  Thread.critical = true
  @que.push obj
  begin
    t = @waiting.shift
    t.wakeup if t
  rescue ThreadError
    retry
  ensure
    Thread.critical = false
  end
  begin
    t.run if t
  rescue ThreadError
  end
end