Class: Queue

Inherits:
Object show all
Defined in:
thread_sync.c,
thread_sync.c

Overview

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

The class implements FIFO type of queue. In a FIFO queue, the first tasks added are the first retrieved.

Example:

queue = Queue.new

producer = Thread.new do

5.times do |i|
   sleep rand(i) # simulate expense
   queue << i
   puts "#{i} produced"
end

end

consumer = Thread.new do

5.times do |i|
   value = queue.pop
   sleep rand(i/2) # simulate expense
   puts "consumed #{value}"
end

end

consumer.join

Instance Method Summary collapse

Constructor Details

#initializeObject

Creates a new queue instance.



769
770
771
772
773
774
775
776
# File 'thread_sync.c', line 769

static VALUE
rb_queue_initialize(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);
    RB_OBJ_WRITE(self, &q->que, ary_buf_new());
    list_head_init(queue_waitq(q));
    return self;
}

Instance Method Details

#clearObject

Removes all objects from the queue.



983
984
985
986
987
988
989
990
# File 'thread_sync.c', line 983

static VALUE
rb_queue_clear(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    rb_ary_clear(check_array(self, q->que));
    return self;
}

#closeObject

Closes the queue. A closed queue cannot be re-opened.

After the call to close completes, the following are true:

  • closed? will return true

  • close will be ignored.

  • calling enq/push/<< will raise a ClosedQueueError.

  • when empty? is false, calling deq/pop/shift will return an object from the queue as usual.

  • when empty? is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise a ThreadError.

ClosedQueueError is inherited from StopIteration, so that you can break loop block.

Example:

  	q = Queue.new
    Thread.new{
      while e = q.deq # wait for nil to break loop
        # ...
      end
    }
    q.close


822
823
824
825
826
827
828
829
830
831
832
833
834
# File 'thread_sync.c', line 822

static VALUE
rb_queue_close(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    if (!queue_closed_p(self)) {
	FL_SET(self, QUEUE_CLOSED);

	wakeup_all(queue_waitq(q));
    }

    return self;
}

#closed?Boolean

Returns true if the queue is closed.

Returns:

  • (Boolean)


843
844
845
846
847
# File 'thread_sync.c', line 843

static VALUE
rb_queue_closed_p(VALUE self)
{
    return queue_closed_p(self) ? Qtrue : Qfalse;
}

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


971
972
973
974
975
# File 'thread_sync.c', line 971

static VALUE
rb_queue_empty_p(VALUE self)
{
    return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
}

#lengthObject #sizeObject Also known as: size

Returns the length of the queue.



1001
1002
1003
1004
1005
# File 'thread_sync.c', line 1001

static VALUE
rb_queue_length(VALUE self)
{
    return LONG2NUM(queue_length(self, queue_ptr(self)));
}

#marshal_dumpObject

:nodoc:



1441
1442
1443
1444
1445
1446
# File 'thread_sync.c', line 1441

static VALUE
undumpable(VALUE obj)
{
    rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
    UNREACHABLE_RETURN(Qnil);
}

#num_waitingObject

Returns the number of threads waiting on the queue.



1013
1014
1015
1016
1017
1018
1019
# File 'thread_sync.c', line 1013

static VALUE
rb_queue_num_waiting(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    return INT2NUM(q->num_waiting);
}

#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and ThreadError is raised.



957
958
959
960
961
962
# File 'thread_sync.c', line 957

static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
    int should_block = queue_pop_should_block(argc, argv);
    return queue_do_pop(self, queue_ptr(self), should_block);
}

#push(object) ⇒ Object #enq(object) ⇒ Object #<<(object) ⇒ Object Also known as: enq, <<

Pushes the given object to the queue.



859
860
861
862
863
# File 'thread_sync.c', line 859

static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
    return queue_do_push(self, queue_ptr(self), obj);
}