Class: Hoodoo::Communicators::Pool::QueueWithTimeout

Inherits:
Object
  • Object
show all
Defined in:
lib/hoodoo/communicators/pool.rb

Overview

Internal implementation detail of Hoodoo::Communicators::Pool.

Since pool clients can say “wait until (one or all) workers have processed their Queue contents”, we need to have some way of seeing when all work is done. The clean way to do it is to push ‘sync now’ messages onto the communicator Threads work Queues, so that as they work through the Queue they’ll eventually reach that message. They then push a message onto a sync Queue for that worker. Meanwhile the waiting pool does (e.g.) a pop on the sync Queue, which means it blocks until the workers say they’ve finished. No busy waiting, Ruby gets to make its best guess at scheduling, etc.; all good.

The catch? You can’t use Timeout::timeout...do... around a Queue pop. It just doesn’t work. It’s a strange omission and requires code gymnastics to work around.

Enter QueueWithTimeout, from:

http://spin.atomicobject.com/2014/07/07/ruby-queue-pop-timeout/

Instance Method Summary collapse

Constructor Details

#initializeQueueWithTimeout

Create a new instance.



502
503
504
505
506
# File 'lib/hoodoo/communicators/pool.rb', line 502

def initialize
  @mutex    = ::Mutex.new
  @queue    = []
  @recieved = ::ConditionVariable.new
end

Instance Method Details

#<<(entry) ⇒ Object

Push a new entry to the end of the queue.

entry

Entry to put onto the end of the queue.



512
513
514
515
516
517
# File 'lib/hoodoo/communicators/pool.rb', line 512

def <<( entry )
  @mutex.synchronize do
    @queue << entry
    @recieved.signal
  end
end

#shift(timeout = nil) ⇒ Object

Take an entry from the front of the queue (FIFO) with optional timeout if the queue is empty.

timeout

Timeout (in seconds, Integer or Float) to wait for an item to appear on the queue, if the queue is empty. If nil, there is no timeout (waits indefinitely). Optional; default is nil.

If given a non-nil timeout value and the timeout expires, raises a ThreadError exception (just as non-blocking Ruby Queue#pop would).



530
531
532
533
534
535
536
537
538
539
# File 'lib/hoodoo/communicators/pool.rb', line 530

def shift( timeout = nil )
  @mutex.synchronize do
    if @queue.empty?
      @recieved.wait( @mutex, timeout ) if timeout != 0
      raise( ThreadError, 'queue empty' ) if @queue.empty?
    end

    @queue.shift
  end
end