Class: DatWorkerPool::Runner
- Inherits:
-
Object
- Object
- DatWorkerPool::Runner
- Defined in:
- lib/dat-worker-pool/runner.rb
Defined Under Namespace
Classes: LoggerProxy, NullLoggerProxy
Instance Attribute Summary collapse
-
#logger_proxy ⇒ Object
readonly
Returns the value of attribute logger_proxy.
-
#num_workers ⇒ Object
readonly
Returns the value of attribute num_workers.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#worker_class ⇒ Object
readonly
Returns the value of attribute worker_class.
-
#worker_params ⇒ Object
readonly
Returns the value of attribute worker_params.
Instance Method Summary collapse
- #available_worker_count ⇒ Object
-
#initialize(args) ⇒ Runner
constructor
A new instance of Runner.
- #log(&message_block) ⇒ Object
- #make_worker_available(worker) ⇒ Object
- #make_worker_unavailable(worker) ⇒ Object
-
#shutdown(timeout = nil) ⇒ Object
the workers should be told to shutdown before the queue because the queue shutdown will wake them up; a worker popping on a shutdown queue will always get
nilback and will loop as fast as allowed until its shutdown flag is flipped, so shutting down the workers then the queue keeps them from looping as fast as possible; if any kind of standard error or the expected timeout error (assuming the workers take too long to shutdown) is raised, force a shutdown; this ensures we shutdown as best as possible instead of letting ruby kill the threads when the process exits; non-timeout errors will be re-raised so they can be caught and handled (or shown when the process exits). - #start ⇒ Object
- #worker_available? ⇒ Boolean
- #worker_log(worker, &message_block) ⇒ Object
- #workers ⇒ Object
Constructor Details
#initialize(args) ⇒ Runner
Returns a new instance of Runner.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/dat-worker-pool/runner.rb', line 13 def initialize(args) @num_workers = args[:num_workers] @queue = args[:queue] @worker_class = args[:worker_class] @worker_params = args[:worker_params] @logger_proxy = if args[:logger] LoggerProxy.new(args[:logger]) else NullLoggerProxy.new end @workers = LockedArray.new @available_workers = LockedSet.new end |
Instance Attribute Details
#logger_proxy ⇒ Object (readonly)
Returns the value of attribute logger_proxy.
11 12 13 |
# File 'lib/dat-worker-pool/runner.rb', line 11 def logger_proxy @logger_proxy end |
#num_workers ⇒ Object (readonly)
Returns the value of attribute num_workers.
10 11 12 |
# File 'lib/dat-worker-pool/runner.rb', line 10 def num_workers @num_workers end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
11 12 13 |
# File 'lib/dat-worker-pool/runner.rb', line 11 def queue @queue end |
#worker_class ⇒ Object (readonly)
Returns the value of attribute worker_class.
10 11 12 |
# File 'lib/dat-worker-pool/runner.rb', line 10 def worker_class @worker_class end |
#worker_params ⇒ Object (readonly)
Returns the value of attribute worker_params.
10 11 12 |
# File 'lib/dat-worker-pool/runner.rb', line 10 def worker_params @worker_params end |
Instance Method Details
#available_worker_count ⇒ Object
76 77 78 |
# File 'lib/dat-worker-pool/runner.rb', line 76 def available_worker_count @available_workers.size end |
#log(&message_block) ⇒ Object
92 93 94 |
# File 'lib/dat-worker-pool/runner.rb', line 92 def log(&) @logger_proxy.runner_log(&) end |
#make_worker_available(worker) ⇒ Object
84 85 86 |
# File 'lib/dat-worker-pool/runner.rb', line 84 def make_worker_available(worker) @available_workers.add(worker.object_id) end |
#make_worker_unavailable(worker) ⇒ Object
88 89 90 |
# File 'lib/dat-worker-pool/runner.rb', line 88 def make_worker_unavailable(worker) @available_workers.remove(worker.object_id) end |
#shutdown(timeout = nil) ⇒ Object
the workers should be told to shutdown before the queue because the queue shutdown will wake them up; a worker popping on a shutdown queue will always get nil back and will loop as fast as allowed until its shutdown flag is flipped, so shutting down the workers then the queue keeps them from looping as fast as possible; if any kind of standard error or the expected timeout error (assuming the workers take too long to shutdown) is raised, force a shutdown; this ensures we shutdown as best as possible instead of letting ruby kill the threads when the process exits; non-timeout errors will be re-raised so they can be caught and handled (or shown when the process exits)
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/dat-worker-pool/runner.rb', line 49 def shutdown(timeout = nil) log do = timeout ? "#{timeout} second(s)" : "none" "Shutting down worker pool (timeout: #{timeout_message})" end begin @workers.with_lock{ |m, ws| ws.each(&:dwp_signal_shutdown) } @queue.dwp_signal_shutdown MuchTimeout.just_optional_timeout(timeout, { :do => proc{ @queue.dwp_shutdown wait_for_workers_to_shutdown }, :on_timeout => proc{ e = ShutdownError.new("Timed out shutting down (#{timeout} seconds).") force_workers_to_shutdown(e, timeout) } }) do end rescue StandardError => err e = ShutdownError.new("Errored while shutting down: #{err.inspect}") force_workers_to_shutdown(e, timeout) raise err end log{ "Finished shutting down" } end |
#start ⇒ Object
33 34 35 36 37 |
# File 'lib/dat-worker-pool/runner.rb', line 33 def start log{ "Starting worker pool with #{@num_workers} worker(s)" } @queue.dwp_start @num_workers.times.each{ |n| build_worker(n + 1) } end |
#worker_available? ⇒ Boolean
80 81 82 |
# File 'lib/dat-worker-pool/runner.rb', line 80 def worker_available? self.available_worker_count > 0 end |
#worker_log(worker, &message_block) ⇒ Object
96 97 98 |
# File 'lib/dat-worker-pool/runner.rb', line 96 def worker_log(worker, &) @logger_proxy.worker_log(worker, &) end |
#workers ⇒ Object
29 30 31 |
# File 'lib/dat-worker-pool/runner.rb', line 29 def workers @workers.values end |