Class: DatWorkerPool
- Inherits:
-
Object
- Object
- DatWorkerPool
- Defined in:
- lib/dat-worker-pool.rb,
lib/dat-worker-pool/queue.rb,
lib/dat-worker-pool/worker.rb,
lib/dat-worker-pool/version.rb
Defined Under Namespace
Modules: Logger Classes: Queue, Worker, WorkersWaiting
Constant Summary collapse
- TimeoutError =
Class.new(RuntimeError)
- VERSION =
"0.1.0"
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
Instance Method Summary collapse
-
#add_work(work_item) ⇒ Object
Check if all workers are busy before adding the work.
-
#despawn_worker(worker) ⇒ Object
public, because workers need to call it for themselves.
-
#initialize(min = 0, max = 1, debug = false, &do_work_proc) ⇒ DatWorkerPool
constructor
A new instance of DatWorkerPool.
-
#shutdown(timeout) ⇒ Object
Shutdown each worker and then the queue.
- #waiting ⇒ Object
- #work_items ⇒ Object
Constructor Details
#initialize(min = 0, max = 1, debug = false, &do_work_proc) ⇒ DatWorkerPool
Returns a new instance of DatWorkerPool.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/dat-worker-pool.rb', line 15 def initialize(min = 0, max = 1, debug = false, &do_work_proc) @min_workers = min @max_workers = max @debug = debug @logger = Logger.new(@debug) @do_work_proc = do_work_proc @queue = Queue.new @workers_waiting = WorkersWaiting.new @mutex = Mutex.new @workers = [] @spawned = 0 @min_workers.times{ self.spawn_worker } end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
13 14 15 |
# File 'lib/dat-worker-pool.rb', line 13 def logger @logger end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
13 14 15 |
# File 'lib/dat-worker-pool.rb', line 13 def spawned @spawned end |
Instance Method Details
#add_work(work_item) ⇒ Object
Check if all workers are busy before adding the work. When the work is added, a worker will stop waiting (if it was idle). Because of that, we can’t reliably check if all workers are busy. We might think all workers are busy because we just woke up a sleeping worker to process this work. Then we would spawn a worker to do nothing.
45 46 47 48 49 50 |
# File 'lib/dat-worker-pool.rb', line 45 def add_work(work_item) return if work_item.nil? new_worker_needed = all_workers_are_busy? @queue.push work_item self.spawn_worker if new_worker_needed && havent_reached_max_workers? end |
#despawn_worker(worker) ⇒ Object
public, because workers need to call it for themselves
78 79 80 81 82 83 |
# File 'lib/dat-worker-pool.rb', line 78 def despawn_worker(worker) @mutex.synchronize do @spawned -= 1 @workers.delete worker end end |
#shutdown(timeout) ⇒ Object
Shutdown each worker and then the queue. Shutting down the queue will signal any workers waiting on it to wake up, so they can start shutting down. If a worker is processing work, then it will be joined and allowed to finish. NOTE Any work that is left on the queue isn’t processed. The controlling application for the worker pool should gracefully handle these items.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/dat-worker-pool.rb', line 58 def shutdown(timeout) begin SystemTimer.timeout(timeout, TimeoutError) do @workers.each(&:shutdown) @queue.shutdown # use this pattern instead of `each` -- we don't want to call `join` on # every worker (especially if they are shutting down on their own), we # just want to make sure that any who haven't had a chance to finish # get to (this is safe, otherwise you might get a dead thread in the # `each`). @workers.first.join until @workers.empty? end rescue TimeoutError => exception exception..replace "Timed out shutting down the worker pool" @debug ? raise(exception) : self.logger.error(exception.) end end |
#waiting ⇒ Object
36 37 38 |
# File 'lib/dat-worker-pool.rb', line 36 def waiting @workers_waiting.count end |
#work_items ⇒ Object
32 33 34 |
# File 'lib/dat-worker-pool.rb', line 32 def work_items @queue.work_items end |