Class: ThreadExecutor::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/thread_executor/processor.rb

Overview

A processor is a Queue feeding a Thread.

Instance Method Summary collapse

Constructor Details

#initializeProcessor

Create Processor.

This will create a new Queue and start a new ruby Thread.

The created thread will block until work is inserted n the Queue using #call.

To avoid leaking active threads you must call #finish to stop processing and join the Thread behind this object. Once this object is finished it may not be used again. It should be discarded.

Typically the user should never create or use this class, but use an Executor, though there is nothing wrong in using this directly..



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/thread_executor/processor.rb', line 51

def initialize
  @q = Queue.new
  @t = Thread.new do
    while true do
      promise, task = @q.deq

      # This is how we shut down the thread cleanly.
      break if promise.nil? && task.nil?

      begin
        promise.value = task.call
      rescue Exception => e
        promise.exception = e
      end
    end
  end
end

Instance Method Details

#call(&t) ⇒ Object

Adds a task, creates a Promise and returns a Future.



80
81
82
83
84
# File 'lib/thread_executor/processor.rb', line 80

def call(&t)
  p = Promise.new
  @q.enq [ p, t ]
  p.future
end

#finishObject

Call #shutdown and join the thread.

This will block until the thread is joined.

When this returns this object is unusable and should be discarded.



97
98
99
100
# File 'lib/thread_executor/processor.rb', line 97

def finish
  shutdown
  @t.join
end

#shutdownObject

Signal that the worker thread should exit.

More precisely, this enqueues a stop request into the work queue, which, when encountered, causes the worker thread to cleanly exit and take no more work.



75
76
77
# File 'lib/thread_executor/processor.rb', line 75

def shutdown
  @q.enq [nil, nil]
end

#sizeObject

Return the size of the work queue.



87
88
89
# File 'lib/thread_executor/processor.rb', line 87

def size
  @q.size
end