Class: DatWorkerPool::Runner
- Inherits:
-
Object
- Object
- DatWorkerPool::Runner
show all
- Defined in:
- lib/dat-worker-pool/runner.rb
Defined Under Namespace
Classes: LoggerProxy, NullLoggerProxy
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(args) ⇒ Runner
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/dat-worker-pool/runner.rb', line 13
def initialize(args)
@num_workers = args[:num_workers]
@queue = args[:queue]
@worker_class = args[:worker_class]
@worker_params = args[:worker_params]
@logger_proxy = if args[:logger]
LoggerProxy.new(args[:logger])
else
NullLoggerProxy.new
end
@workers = LockedArray.new
@available_workers = LockedSet.new
end
|
Instance Attribute Details
#logger_proxy ⇒ Object
Returns the value of attribute logger_proxy.
11
12
13
|
# File 'lib/dat-worker-pool/runner.rb', line 11
def logger_proxy
@logger_proxy
end
|
#num_workers ⇒ Object
Returns the value of attribute num_workers.
10
11
12
|
# File 'lib/dat-worker-pool/runner.rb', line 10
def num_workers
@num_workers
end
|
#queue ⇒ Object
Returns the value of attribute queue.
11
12
13
|
# File 'lib/dat-worker-pool/runner.rb', line 11
def queue
@queue
end
|
#worker_class ⇒ Object
Returns the value of attribute worker_class.
10
11
12
|
# File 'lib/dat-worker-pool/runner.rb', line 10
def worker_class
@worker_class
end
|
#worker_params ⇒ Object
Returns the value of attribute worker_params.
10
11
12
|
# File 'lib/dat-worker-pool/runner.rb', line 10
def worker_params
@worker_params
end
|
Instance Method Details
#available_worker_count ⇒ Object
76
77
78
|
# File 'lib/dat-worker-pool/runner.rb', line 76
def available_worker_count
@available_workers.size
end
|
#log(&message_block) ⇒ Object
92
93
94
|
# File 'lib/dat-worker-pool/runner.rb', line 92
def log(&message_block)
@logger_proxy.runner_log(&message_block)
end
|
#make_worker_available(worker) ⇒ Object
84
85
86
|
# File 'lib/dat-worker-pool/runner.rb', line 84
def make_worker_available(worker)
@available_workers.add(worker.object_id)
end
|
#make_worker_unavailable(worker) ⇒ Object
88
89
90
|
# File 'lib/dat-worker-pool/runner.rb', line 88
def make_worker_unavailable(worker)
@available_workers.remove(worker.object_id)
end
|
#shutdown(timeout = nil) ⇒ Object
the workers should be told to shutdown before the queue because the queue shutdown will wake them up; a worker popping on a shutdown queue will always get nil back and will loop as fast as allowed until its shutdown flag is flipped, so shutting down the workers then the queue keeps them from looping as fast as possible; if any kind of standard error or the expected timeout error (assuming the workers take too long to shutdown) is raised, force a shutdown; this ensures we shutdown as best as possible instead of letting ruby kill the threads when the process exits; non-timeout errors will be re-raised so they can be caught and handled (or shown when the process exits)
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/dat-worker-pool/runner.rb', line 49
def shutdown(timeout = nil)
log do
timeout_message = timeout ? "#{timeout} second(s)" : "none"
"Shutting down worker pool (timeout: #{timeout_message})"
end
begin
@workers.with_lock{ |m, ws| ws.each(&:dwp_signal_shutdown) }
@queue.dwp_signal_shutdown
MuchTimeout.just_optional_timeout(timeout, {
:do => proc{
@queue.dwp_shutdown
wait_for_workers_to_shutdown
},
:on_timeout => proc{
e = ShutdownError.new("Timed out shutting down (#{timeout} seconds).")
force_workers_to_shutdown(e, timeout)
}
}) do
end
rescue StandardError => err
e = ShutdownError.new("Errored while shutting down: #{err.inspect}")
force_workers_to_shutdown(e, timeout)
raise err
end
log{ "Finished shutting down" }
end
|
#start ⇒ Object
33
34
35
36
37
|
# File 'lib/dat-worker-pool/runner.rb', line 33
def start
log{ "Starting worker pool with #{@num_workers} worker(s)" }
@queue.dwp_start
@num_workers.times.each{ |n| build_worker(n + 1) }
end
|
#worker_available? ⇒ Boolean
80
81
82
|
# File 'lib/dat-worker-pool/runner.rb', line 80
def worker_available?
self.available_worker_count > 0
end
|
#worker_log(worker, &message_block) ⇒ Object
96
97
98
|
# File 'lib/dat-worker-pool/runner.rb', line 96
def worker_log(worker, &message_block)
@logger_proxy.worker_log(worker, &message_block)
end
|
#workers ⇒ Object
29
30
31
|
# File 'lib/dat-worker-pool/runner.rb', line 29
def workers
@workers.values
end
|