Class: DatWorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-worker-pool.rb,
lib/dat-worker-pool/queue.rb,
lib/dat-worker-pool/worker.rb,
lib/dat-worker-pool/version.rb

Defined Under Namespace

Modules: Logger Classes: Queue, Worker, WorkersWaiting

Constant Summary collapse

TimeoutError =
Class.new(RuntimeError)
VERSION =
"0.1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min = 0, max = 1, debug = false, &do_work_proc) ⇒ DatWorkerPool

Returns a new instance of DatWorkerPool.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/dat-worker-pool.rb', line 15

def initialize(min = 0, max = 1, debug = false, &do_work_proc)
  @min_workers  = min
  @max_workers  = max
  @debug        = debug
  @logger       = Logger.new(@debug)
  @do_work_proc = do_work_proc

  @queue           = Queue.new
  @workers_waiting = WorkersWaiting.new

  @mutex   = Mutex.new
  @workers = []
  @spawned = 0

  @min_workers.times{ self.spawn_worker }
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



13
14
15
# File 'lib/dat-worker-pool.rb', line 13

def logger
  @logger
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



13
14
15
# File 'lib/dat-worker-pool.rb', line 13

def spawned
  @spawned
end

Instance Method Details

#add_work(work_item) ⇒ Object

Check if all workers are busy before adding the work. When the work is added, a worker will stop waiting (if it was idle). Because of that, we can’t reliably check if all workers are busy. We might think all workers are busy because we just woke up a sleeping worker to process this work. Then we would spawn a worker to do nothing.



45
46
47
48
49
50
# File 'lib/dat-worker-pool.rb', line 45

def add_work(work_item)
  return if work_item.nil?
  new_worker_needed = all_workers_are_busy?
  @queue.push work_item
  self.spawn_worker if new_worker_needed && havent_reached_max_workers?
end

#despawn_worker(worker) ⇒ Object

public, because workers need to call it for themselves



78
79
80
81
82
83
# File 'lib/dat-worker-pool.rb', line 78

def despawn_worker(worker)
  @mutex.synchronize do
    @spawned -= 1
    @workers.delete worker
  end
end

#shutdown(timeout) ⇒ Object

Shutdown each worker and then the queue. Shutting down the queue will signal any workers waiting on it to wake up, so they can start shutting down. If a worker is processing work, then it will be joined and allowed to finish. NOTE Any work that is left on the queue isn’t processed. The controlling application for the worker pool should gracefully handle these items.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/dat-worker-pool.rb', line 58

def shutdown(timeout)
  begin
    SystemTimer.timeout(timeout, TimeoutError) do
      @workers.each(&:shutdown)
      @queue.shutdown

      # use this pattern instead of `each` -- we don't want to call `join` on
      # every worker (especially if they are shutting down on their own), we
      # just want to make sure that any who haven't had a chance to finish
      # get to (this is safe, otherwise you might get a dead thread in the
      # `each`).
      @workers.first.join until @workers.empty?
    end
  rescue TimeoutError => exception
    exception.message.replace "Timed out shutting down the worker pool"
    @debug ? raise(exception) : self.logger.error(exception.message)
  end
end

#waitingObject



36
37
38
# File 'lib/dat-worker-pool.rb', line 36

def waiting
  @workers_waiting.count
end

#work_itemsObject



32
33
34
# File 'lib/dat-worker-pool.rb', line 32

def work_items
  @queue.work_items
end