Class: Queue
- Inherits:
-
Object
- Object
- Queue
- Defined in:
- lib/thread.rb,
thread.c,
thread.c
Overview
This class provides a way to synchronize communication between threads.
Example:
require 'thread'
queue = Queue.new
producer = Thread.new do
5.times do |i|
sleep rand(i) # simulate expense
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
5.times do |i|
value = queue.pop
sleep rand(i/2) # simulate expense
puts "consumed #{value}"
end
end
consumer.join
Direct Known Subclasses
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#empty? ⇒ Boolean
Returns
true
if the queue is empty. -
#initialize ⇒ Queue
constructor
Creates a new queue.
-
#length ⇒ Object
(also: #size)
Returns the length of the queue.
- #marshal_dump ⇒ Object
- #marshal_load(data) ⇒ Object
-
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop(*args) ⇒ Object
(also: #shift, #deq)
call_seq: pop(non_block=false).
-
#push(obj) ⇒ Object
(also: #<<, #enq)
Pushes
obj
to the queue.
Constructor Details
#initialize ⇒ Queue
Creates a new queue.
266 267 268 269 270 271 272 |
# File 'lib/thread.rb', line 266 def initialize @que = [] @waiting = [] @que.taint # enable tainted comunication @waiting.taint self.taint end |
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
340 341 342 |
# File 'lib/thread.rb', line 340 def clear @que.clear end |
#empty? ⇒ Boolean
Returns true
if the queue is empty.
333 334 335 |
# File 'lib/thread.rb', line 333 def empty? @que.empty? end |
#length ⇒ Object Also known as: size
Returns the length of the queue.
347 348 349 |
# File 'lib/thread.rb', line 347 def length @que.length end |
#marshal_dump ⇒ Object
851 852 853 854 855 856 857 858 859 860 861 |
# File 'thread.c', line 851
static VALUE
rb_queue_marshal_dump(VALUE self)
{
Queue *queue;
VALUE array;
Data_Get_Struct(self, Queue, queue);
array = array_from_list(&queue->values);
rb_ary_unshift(array, ULONG2NUM(queue->capacity));
return rb_marshal_dump(array, Qnil);
}
|
#marshal_load(data) ⇒ Object
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 |
# File 'thread.c', line 831
static VALUE
rb_queue_marshal_load(VALUE self, VALUE data)
{
Queue *queue;
VALUE array;
Data_Get_Struct(self, Queue, queue);
array = rb_marshal_load(data);
if (TYPE(array) != T_ARRAY) {
rb_raise(rb_eTypeError, "expected Array of queue data");
}
if (RARRAY(array)->len < 1) {
rb_raise(rb_eArgError, "missing capacity value");
}
queue->capacity = NUM2ULONG(rb_ary_shift(array));
push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);
return self;
}
|
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
359 360 361 |
# File 'lib/thread.rb', line 359 def num_waiting @waiting.size end |
#pop(*args) ⇒ Object Also known as: shift, deq
call_seq: pop(non_block=false)
Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and an exception is raised.
309 310 311 312 313 314 315 316 317 318 |
# File 'lib/thread.rb', line 309 def pop(non_block=false) while (Thread.critical = true; @que.empty?) raise ThreadError, "queue empty" if non_block @waiting.push Thread.current Thread.stop end @que.shift ensure Thread.critical = false end |
#push(obj) ⇒ Object Also known as: <<, enq
Pushes obj
to the queue.
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/thread.rb', line 277 def push(obj) Thread.critical = true @que.push obj begin t = @waiting.shift t.wakeup if t rescue ThreadError retry ensure Thread.critical = false end begin t.run if t rescue ThreadError end end |