Class: DatWorkerPool::Runner
- Inherits:
-
Object
- Object
- DatWorkerPool::Runner
- Defined in:
- lib/dat-worker-pool/runner.rb
Defined Under Namespace
Modules: OptionalTimeout Classes: LoggerProxy, NullLoggerProxy
Instance Attribute Summary collapse
-
#logger_proxy ⇒ Object
readonly
Returns the value of attribute logger_proxy.
-
#num_workers ⇒ Object
readonly
Returns the value of attribute num_workers.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#worker_class ⇒ Object
readonly
Returns the value of attribute worker_class.
-
#worker_params ⇒ Object
readonly
Returns the value of attribute worker_params.
Instance Method Summary collapse
- #available_worker_count ⇒ Object
-
#initialize(args) ⇒ Runner
constructor
A new instance of Runner.
- #log(&message_block) ⇒ Object
- #make_worker_available(worker) ⇒ Object
- #make_worker_unavailable(worker) ⇒ Object
-
#shutdown(timeout = nil, backtrace = nil) ⇒ Object
the workers should be told to shutdown before the queue because the queue shutdown will wake them up; a worker popping on a shutdown queue will always get
nilback and will loop as fast as allowed until its shutdown flag is flipped, so shutting down the workers then the queue keeps them from looping as fast as possible; if any kind of standard error or the expected timeout error (assuming the workers take too long to shutdown) is raised, force a shutdown; this ensures we shutdown as best as possible instead of letting ruby kill the threads when the process exits; non-timeout errors will be re-raised so they can be caught and handled (or shown when the process exits). - #start ⇒ Object
- #worker_available? ⇒ Boolean
- #worker_log(worker, &message_block) ⇒ Object
- #workers ⇒ Object
Constructor Details
#initialize(args) ⇒ Runner
Returns a new instance of Runner.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/dat-worker-pool/runner.rb', line 13 def initialize(args) @num_workers = args[:num_workers] @queue = args[:queue] @worker_class = args[:worker_class] @worker_params = args[:worker_params] @logger_proxy = if args[:logger] LoggerProxy.new(args[:logger]) else NullLoggerProxy.new end @workers = LockedArray.new @available_workers = LockedSet.new end |
Instance Attribute Details
#logger_proxy ⇒ Object (readonly)
Returns the value of attribute logger_proxy.
11 12 13 |
# File 'lib/dat-worker-pool/runner.rb', line 11 def logger_proxy @logger_proxy end |
#num_workers ⇒ Object (readonly)
Returns the value of attribute num_workers.
10 11 12 |
# File 'lib/dat-worker-pool/runner.rb', line 10 def num_workers @num_workers end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
11 12 13 |
# File 'lib/dat-worker-pool/runner.rb', line 11 def queue @queue end |
#worker_class ⇒ Object (readonly)
Returns the value of attribute worker_class.
10 11 12 |
# File 'lib/dat-worker-pool/runner.rb', line 10 def worker_class @worker_class end |
#worker_params ⇒ Object (readonly)
Returns the value of attribute worker_params.
10 11 12 |
# File 'lib/dat-worker-pool/runner.rb', line 10 def worker_params @worker_params end |
Instance Method Details
#available_worker_count ⇒ Object
70 71 72 |
# File 'lib/dat-worker-pool/runner.rb', line 70 def available_worker_count @available_workers.size end |
#log(&message_block) ⇒ Object
86 87 88 |
# File 'lib/dat-worker-pool/runner.rb', line 86 def log(&) @logger_proxy.runner_log(&) end |
#make_worker_available(worker) ⇒ Object
78 79 80 |
# File 'lib/dat-worker-pool/runner.rb', line 78 def make_worker_available(worker) @available_workers.add(worker.object_id) end |
#make_worker_unavailable(worker) ⇒ Object
82 83 84 |
# File 'lib/dat-worker-pool/runner.rb', line 82 def make_worker_unavailable(worker) @available_workers.remove(worker.object_id) end |
#shutdown(timeout = nil, backtrace = nil) ⇒ Object
the workers should be told to shutdown before the queue because the queue shutdown will wake them up; a worker popping on a shutdown queue will always get nil back and will loop as fast as allowed until its shutdown flag is flipped, so shutting down the workers then the queue keeps them from looping as fast as possible; if any kind of standard error or the expected timeout error (assuming the workers take too long to shutdown) is raised, force a shutdown; this ensures we shutdown as best as possible instead of letting ruby kill the threads when the process exits; non-timeout errors will be re-raised so they can be caught and handled (or shown when the process exits)
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/dat-worker-pool/runner.rb', line 49 def shutdown(timeout = nil, backtrace = nil) log do = timeout ? "#{timeout} second(s)" : "none" "Shutting down worker pool (timeout: #{})" end begin @workers.with_lock{ |m, ws| ws.each(&:dwp_signal_shutdown) } @queue.dwp_signal_shutdown OptionalTimeout.new(timeout) do @queue.dwp_shutdown wait_for_workers_to_shutdown end rescue StandardError => exception force_workers_to_shutdown(exception, timeout, backtrace) raise exception rescue TimeoutInterruptError => exception force_workers_to_shutdown(exception, timeout, backtrace) end log{ "Finished shutting down" } end |
#start ⇒ Object
33 34 35 36 37 |
# File 'lib/dat-worker-pool/runner.rb', line 33 def start log{ "Starting worker pool with #{@num_workers} worker(s)" } @queue.dwp_start @num_workers.times.each{ |n| build_worker(n + 1) } end |
#worker_available? ⇒ Boolean
74 75 76 |
# File 'lib/dat-worker-pool/runner.rb', line 74 def worker_available? self.available_worker_count > 0 end |
#worker_log(worker, &message_block) ⇒ Object
90 91 92 |
# File 'lib/dat-worker-pool/runner.rb', line 90 def worker_log(worker, &) @logger_proxy.worker_log(worker, &) end |
#workers ⇒ Object
29 30 31 |
# File 'lib/dat-worker-pool/runner.rb', line 29 def workers @workers.values end |