Class: Threadz::Batch
- Inherits:
-
Object
- Object
- Threadz::Batch
- Defined in:
- lib/threadz/batch.rb
Overview
A batch is a collection of jobs you care about that gets pushed off to the attached thread pool. The calling thread can be signaled when the batch has completed executing, or a block can be executed.
Instance Method Summary collapse
-
#completed? ⇒ Boolean
Returns true iff there are no jobs outstanding.
-
#initialize(threadpool, opts = {}) ⇒ Batch
constructor
Creates a new batch attached to the given threadpool.
-
#push(job) ⇒ Object
(also: #<<)
Add a new job to the batch.
-
#start ⇒ Object
If this is a latent batch, start processing all of the jobs in the queue.
-
#wait_until_done(opts = {}) ⇒ Object
Put the current thread to sleep until the batch is done processing.
-
#when_done(&block) ⇒ Object
Execute a given block when the batch has finished processing.
Constructor Details
#initialize(threadpool, opts = {}) ⇒ Batch
Creates a new batch attached to the given threadpool. A number of options are available:
:latent-
If latent, none of the jobs in the batch will actually start
executing until the +start+ method is called.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/threadz/batch.rb', line 12 def initialize(threadpool, opts={}) @threadpool = threadpool @waiting_threads = [] @job_lock = Mutex.new @jobs_count = AtomicInteger.new(0) @when_done_blocks = [] @sleeper = ::Threadz::Sleeper.new ## Options #latent @latent = opts.key?(:latent) ? opts[:latent] : false if(@latent) @started = false else @started = true end @job_queue = Queue.new if @latent end |
Instance Method Details
#completed? ⇒ Boolean
Returns true iff there are no jobs outstanding.
65 66 67 |
# File 'lib/threadz/batch.rb', line 65 def completed? return @jobs_count.value == 0 end |
#push(job) ⇒ Object Also known as: <<
Add a new job to the batch. If this is a latent batch, the job can’t be scheduled until the batch is #start’ed; otherwise it may start immediately. The job can be anything that responds to call or an array of objects that respond to call.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/threadz/batch.rb', line 36 def push(job) if job.is_a? Array job.each {|j| self << j} elsif job.respond_to? :call @jobs_count.increment if @latent && !@started @job_queue << job else send_to_threadpool job end else raise "Not a valid job: needs to support #call" end end |
#start ⇒ Object
If this is a latent batch, start processing all of the jobs in the queue.
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/threadz/batch.rb', line 70 def start @job_lock.synchronize { # in case another thread tries to push new jobs onto the queue while we're starting if @latent @started = true until @job_queue.empty? send_to_threadpool @job_queue.pop end return true else return false end } end |
#wait_until_done(opts = {}) ⇒ Object
Put the current thread to sleep until the batch is done processing. There are options available:
:timeout-
If specified, will only wait for at least this many seconds for the batch to finish. Typically used with #completed?
57 58 59 60 61 62 |
# File 'lib/threadz/batch.rb', line 57 def wait_until_done(opts={}) raise "Threadz: thread deadlocked because batch job was never started" if @latent && !@started timeout = opts.key?(:timeout) ? opts[:timeout] : 0 @sleeper.wait(timeout) unless completed? end |
#when_done(&block) ⇒ Object
Execute a given block when the batch has finished processing. If the batch has already finished executing, execute immediately.
86 87 88 |
# File 'lib/threadz/batch.rb', line 86 def when_done(&block) @job_lock.synchronize { completed? ? block.call : @when_done_blocks << block } end |