Class: DatWorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-worker-pool.rb,
lib/dat-worker-pool/queue.rb,
lib/dat-worker-pool/runner.rb,
lib/dat-worker-pool/worker.rb,
lib/dat-worker-pool/version.rb,
lib/dat-worker-pool/default_queue.rb,
lib/dat-worker-pool/locked_object.rb,
lib/dat-worker-pool/worker_pool_spy.rb

Defined Under Namespace

Modules: Queue, Worker Classes: DefaultQueue, LockedArray, LockedObject, LockedSet, Runner, WorkerPoolSpy

Constant Summary collapse

DEFAULT_NUM_WORKERS =
1
MIN_WORKERS =
1
ShutdownError =

this error should never be “swallowed”, if it is caught be sure to re-raise it so the workers shutdown; otherwise workers will get killed (‘Thread#kill`) by ruby which can cause a problems

Class.new(Interrupt)
VERSION =
"0.6.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_class, options = nil) ⇒ DatWorkerPool

Returns a new instance of DatWorkerPool.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/dat-worker-pool.rb', line 15

def initialize(worker_class, options = nil)
  if !worker_class.kind_of?(Class) || !worker_class.include?(DatWorkerPool::Worker)
    raise ArgumentError, "worker class must include `#{DatWorkerPool::Worker}`"
  end

  options ||= {}
  num_workers = (options[:num_workers] || DEFAULT_NUM_WORKERS).to_i
  if num_workers < MIN_WORKERS
    raise ArgumentError, "number of workers must be at least #{MIN_WORKERS}"
  end

  @queue = options[:queue] || begin
    require 'dat-worker-pool/default_queue'
    DatWorkerPool::DefaultQueue.new
  end

  @runner = DatWorkerPool::Runner.new({
    :num_workers   => num_workers,
    :logger        => options[:logger],
    :queue         => @queue,
    :worker_class  => worker_class,
    :worker_params => options[:worker_params]
  })
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



13
14
15
# File 'lib/dat-worker-pool.rb', line 13

def queue
  @queue
end

Instance Method Details

#add_work(work_item) ⇒ Object Also known as: push



48
49
50
51
# File 'lib/dat-worker-pool.rb', line 48

def add_work(work_item)
  return if work_item.nil?
  @queue.dwp_push work_item
end

#available_worker_countObject



58
59
60
# File 'lib/dat-worker-pool.rb', line 58

def available_worker_count
  @runner.available_worker_count
end

#shutdown(timeout = nil) ⇒ Object



44
45
46
# File 'lib/dat-worker-pool.rb', line 44

def shutdown(timeout = nil)
  @runner.shutdown(timeout, caller)
end

#startObject



40
41
42
# File 'lib/dat-worker-pool.rb', line 40

def start
  @runner.start
end

#work_itemsObject



54
55
56
# File 'lib/dat-worker-pool.rb', line 54

def work_items
  @queue.work_items
end

#worker_available?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/dat-worker-pool.rb', line 62

def worker_available?
  @runner.worker_available?
end