Class: Queue
- Inherits:
-
Object
- Object
- Queue
- Defined in:
- lib/thread.rb,
thread.c
Overview
This class provides a way to synchronize communication between threads.
Example:
require 'thread'
queue = Queue.new
producer = Thread.new do
5.times do |i|
sleep rand(i) # simulate expense
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
5.times do |i|
value = queue.pop
sleep rand(i/2) # simulate expense
puts "consumed #{value}"
end
end
consumer.join
Direct Known Subclasses
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#empty? ⇒ Object
Returns
true
if the queue is empty. -
#initialize ⇒ Queue
constructor
Creates a new queue.
-
#length ⇒ Object
(also: #size)
Returns the length of the queue.
- #marshal_dump ⇒ Object
- #marshal_load ⇒ Object
-
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop ⇒ Object
(also: #shift, #deq)
call_seq: pop(non_block=false).
-
#push(obj) ⇒ Object
(also: #<<, #enq)
Pushes
obj
to the queue.
Constructor Details
#initialize ⇒ Queue
Creates a new queue.
266 267 268 269 270 271 272 |
# File 'lib/thread.rb', line 266 def initialize @que = [] @waiting = [] @que.taint # enable tainted comunication @waiting.taint self.taint end |
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/thread.rb', line 340
/*
* Document-method: clear
* call-seq: clear
*
* Removes all objects from the queue.
*
*/
static VALUE
rb_queue_clear(VALUE self)
{
Queue *queue;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
clear_list(&queue->values);
signal_condvar(&queue->space_available);
unlock_mutex(&queue->mutex);
return self;
}
|
#empty? ⇒ Object
Returns true
if the queue is empty.
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 |
# File 'lib/thread.rb', line 333
/*
* Document-method: empty?
* call-seq: empty?
*
* Returns +true+ if the queue is empty.
*
*/
static VALUE
rb_queue_empty_p(VALUE self)
{
Queue *queue;
VALUE result;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
result = queue->values.size == 0 ? Qtrue : Qfalse;
unlock_mutex(&queue->mutex);
return result;
}
|
#length ⇒ Object Also known as: size
Returns the length of the queue.
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/thread.rb', line 347
/*
* Document-method: length
* call-seq: length
*
* Returns the length of the queue.
*
*/
static VALUE
rb_queue_length(VALUE self)
{
Queue *queue;
VALUE result;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
result = ULONG2NUM(queue->values.size);
unlock_mutex(&queue->mutex);
return result;
}
|
#marshal_dump ⇒ Object
#marshal_load ⇒ Object
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/thread.rb', line 359
/*
* Document-method: num_waiting
* call-seq: num_waiting
*
* Returns the number of threads waiting on the queue.
*
*/
static VALUE
rb_queue_num_waiting(VALUE self)
{
Queue *queue;
VALUE result;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
result = ULONG2NUM(queue->value_available.waiting.size +
queue->space_available.waiting.size);
unlock_mutex(&queue->mutex);
return result;
}
|
#pop ⇒ Object Also known as: shift, deq
call_seq: pop(non_block=false)
Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn't suspended, and an exception is raised.
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
# File 'lib/thread.rb', line 309
/*
* Document-method: pop
* call_seq: pop(non_block=false)
*
* Retrieves data from the queue. If the queue is empty, the calling thread is
* suspended until data is pushed onto the queue. If +non_block+ is true, the
* thread isn't suspended, and an exception is raised.
*
*/
static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
Queue *queue;
int should_block;
Data_Get_Struct(self, Queue, queue);
if (argc == 0) {
should_block = 1;
} else if (argc == 1) {
should_block = !RTEST(argv[0]);
} else {
rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
}
lock_mutex(&queue->mutex);
if (!queue->values.entries && !should_block) {
unlock_mutex(&queue->mutex);
rb_raise(rb_eThreadError, "queue empty");
}
while (!queue->values.entries) {
wait_queue(&queue->value_available, &queue->mutex);
}
return rb_ensure(queue_pop_inner, (VALUE)queue,
unlock_mutex_call, (VALUE)&queue->mutex);
}
|
#push(obj) ⇒ Object Also known as: <<, enq
Pushes obj
to the queue.
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/thread.rb', line 277
/*
* Document-method: push
* call-seq: push(obj)
*
* Pushes +obj+ to the queue.
*
*/
static VALUE
rb_queue_push(VALUE self, VALUE value)
{
Queue *queue;
Data_Get_Struct(self, Queue, queue);
lock_mutex(&queue->mutex);
while (queue->capacity && queue->values.size >= queue->capacity) {
wait_queue(&queue->space_available, &queue->mutex);
}
push_list(&queue->values, value);
rb_ensure(signal_condvar_call, (VALUE)&queue->value_available,
unlock_mutex_call, (VALUE)&queue->mutex);
return self;
}
|