Class: Workhorse::Worker
- Inherits:
-
Object
- Object
- Workhorse::Worker
- Defined in:
- lib/workhorse/worker.rb
Overview
Main worker class that manages job polling and execution. Workers poll the database for jobs, manage thread pools for parallel execution, and handle graceful shutdown and memory monitoring.
Constant Summary collapse
- LOG_LEVELS =
%i[fatal error warn info debug].freeze
- SHUTDOWN_SIGNALS =
%w[TERM INT].freeze
- LOG_REOPEN_SIGNAL =
'HUP'.freeze
Instance Attribute Summary collapse
-
#logger ⇒ Logger?
readonly
Optional logger instance.
-
#mutex ⇒ Mutex
readonly
Synchronization mutex for thread safety.
-
#poller ⇒ Workhorse::Poller
readonly
The poller instance.
-
#polling_interval ⇒ Integer
readonly
Polling interval in seconds.
-
#pool_size ⇒ Integer
readonly
Number of threads in the worker pool.
-
#queues ⇒ Array<Symbol>
readonly
The queues this worker processes.
-
#state ⇒ Symbol
readonly
Current worker state (:initialized, :running, :shutdown).
Class Method Summary collapse
-
.shutdown_file_for(pid) ⇒ String?
Returns the path to the shutdown file for a given process ID.
-
.start_and_wait(**args) ⇒ void
Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).
Instance Method Summary collapse
-
#assert_state!(state) ⇒ void
Asserts that the worker is in the expected state.
-
#hostname ⇒ String
Returns the hostname of the machine running this worker.
-
#id ⇒ String
Returns the unique identifier for this worker.
-
#idle ⇒ Integer
Returns the number of idle threads in the pool.
-
#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker
constructor
Instantiates a new worker.
-
#log(text, level = :info) ⇒ void
Logs a message with worker ID prefix.
-
#perform(db_job_id) ⇒ void
Schedules a job for execution in the thread pool.
-
#pid ⇒ Integer
Returns the process ID of this worker.
-
#shutdown ⇒ void
Shuts down worker and DB poller.
-
#start ⇒ void
Starts the worker.
-
#wait ⇒ void
Waits until the worker is shut down.
Constructor Details
#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker
Instantiates a new worker. The worker is not automatically started.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/workhorse/worker.rb', line 88 def initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) @queues = queues @pool_size = pool_size || (queues.size + 1) @polling_interval = polling_interval @auto_terminate = auto_terminate @state = :initialized @quiet = quiet @mutex = Mutex.new @pool = Pool.new(@pool_size) @poller = Workhorse::Poller.new(self, proc { check_memory }) @logger = logger unless (@polling_interval / 0.1).round(2).modulo(1).zero? fail 'Polling interval must be a multiple of 0.1.' end if instant_repolling @pool.on_idle { @poller.instant_repoll! } end end |
Instance Attribute Details
#logger ⇒ Logger? (readonly)
Returns Optional logger instance.
41 42 43 |
# File 'lib/workhorse/worker.rb', line 41 def logger @logger end |
#mutex ⇒ Mutex (readonly)
Returns Synchronization mutex for thread safety.
38 39 40 |
# File 'lib/workhorse/worker.rb', line 38 def mutex @mutex end |
#poller ⇒ Workhorse::Poller (readonly)
Returns The poller instance.
44 45 46 |
# File 'lib/workhorse/worker.rb', line 44 def poller @poller end |
#polling_interval ⇒ Integer (readonly)
Returns Polling interval in seconds.
35 36 37 |
# File 'lib/workhorse/worker.rb', line 35 def polling_interval @polling_interval end |
#pool_size ⇒ Integer (readonly)
Returns Number of threads in the worker pool.
32 33 34 |
# File 'lib/workhorse/worker.rb', line 32 def pool_size @pool_size end |
#queues ⇒ Array<Symbol> (readonly)
Returns The queues this worker processes.
26 27 28 |
# File 'lib/workhorse/worker.rb', line 26 def queues @queues end |
#state ⇒ Symbol (readonly)
Returns Current worker state (:initialized, :running, :shutdown).
29 30 31 |
# File 'lib/workhorse/worker.rb', line 29 def state @state end |
Class Method Details
.shutdown_file_for(pid) ⇒ String?
Returns the path to the shutdown file for a given process ID.
62 63 64 65 |
# File 'lib/workhorse/worker.rb', line 62 def self.shutdown_file_for(pid) return nil unless defined?(Rails) Rails.root.join('tmp', 'pids', "workhorse.#{pid}.shutdown") end |
.start_and_wait(**args) ⇒ void
This method returns an undefined value.
Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).
51 52 53 54 55 |
# File 'lib/workhorse/worker.rb', line 51 def self.start_and_wait(**args) worker = new(**args) worker.start worker.wait end |
Instance Method Details
#assert_state!(state) ⇒ void
This method returns an undefined value.
Asserts that the worker is in the expected state.
169 170 171 |
# File 'lib/workhorse/worker.rb', line 169 def assert_state!(state) fail "Expected worker to be in state #{state} but current state is #{self.state}." unless self.state == state end |
#hostname ⇒ String
Returns the hostname of the machine running this worker.
142 143 144 |
# File 'lib/workhorse/worker.rb', line 142 def hostname @hostname ||= Socket.gethostname end |
#id ⇒ String
Returns the unique identifier for this worker. Format: hostname.pid.random_hex
128 129 130 |
# File 'lib/workhorse/worker.rb', line 128 def id @id ||= "#{hostname}.#{pid}.#{SecureRandom.hex(3)}" end |
#idle ⇒ Integer
Returns the number of idle threads in the pool.
212 213 214 |
# File 'lib/workhorse/worker.rb', line 212 def idle @pool.idle end |
#log(text, level = :info) ⇒ void
This method returns an undefined value.
Logs a message with worker ID prefix.
116 117 118 119 120 121 122 |
# File 'lib/workhorse/worker.rb', line 116 def log(text, level = :info) text = "[Job worker #{id}] #{text}" puts text unless @quiet return unless logger fail "Log level #{level} is not available. Available are #{LOG_LEVELS.inspect}." unless LOG_LEVELS.include?(level) logger.send(level, text.strip) end |
#perform(db_job_id) ⇒ void
This method returns an undefined value.
Schedules a job for execution in the thread pool.
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/workhorse/worker.rb', line 220 def perform(db_job_id) begin # rubocop:disable Style/RedundantBegin mutex.synchronize do assert_state! :running log "Posting job #{db_job_id} to thread pool" @pool.post do begin # rubocop:disable Style/RedundantBegin Workhorse::Performer.new(db_job_id, self).perform rescue Exception => e log %(#{e.}\n#{e.backtrace.join("\n")}), :error Workhorse.on_exception.call(e) end end end rescue Exception => e Workhorse.on_exception.call(e) end end |
#pid ⇒ Integer
Returns the process ID of this worker.
135 136 137 |
# File 'lib/workhorse/worker.rb', line 135 def pid @pid ||= Process.pid end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down worker and DB poller. Jobs currently being processed are properly finished before this method returns. Subsequent calls to this method are ignored.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/workhorse/worker.rb', line 178 def shutdown # This is safe to be checked outside of the mutex as 'shutdown' is the # final state this worker can be in. return if @state == :shutdown # TODO: There is a race-condition with this shutdown: # - If the poller is currently locking a job, it may call # "worker.perform", which in turn tries to synchronize the same mutex. mutex.synchronize do assert_state! :running log 'Shutting down' @state = :shutdown @poller.shutdown @pool.shutdown log 'Shut down' end end |
#start ⇒ void
This method returns an undefined value.
Starts the worker. This call is not blocking - call #wait for this purpose.
151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/workhorse/worker.rb', line 151 def start mutex.synchronize do assert_state! :initialized log 'Starting up' @state = :running @poller.start log 'Started up' trap_termination if @auto_terminate trap_log_reopen end end |
#wait ⇒ void
This method returns an undefined value.
Waits until the worker is shut down. This only happens if #shutdown gets called - either by another thread or by enabling ‘auto_terminate` and receiving a respective signal. Use this method to let worker run indefinitely.
204 205 206 207 |
# File 'lib/workhorse/worker.rb', line 204 def wait @poller.wait @pool.wait end |