Class: Qless::Workers::BaseWorker
- Inherits:
-
Object
- Object
- Qless::Workers::BaseWorker
- Includes:
- SupportsMiddlewareModules
- Defined in:
- lib/qless/worker/base.rb
Direct Known Subclasses
Defined Under Namespace
Modules: SupportsMiddlewareModules
Instance Attribute Summary collapse
-
#interval ⇒ Object
Returns the value of attribute interval.
-
#options ⇒ Object
Returns the value of attribute options.
-
#output ⇒ Object
Returns the value of attribute output.
-
#paused ⇒ Object
Returns the value of attribute paused.
-
#reserver ⇒ Object
Returns the value of attribute reserver.
-
#sighup_handler ⇒ Object
Returns the value of attribute sighup_handler.
Instance Method Summary collapse
- #deregister ⇒ Object
- #fail_job(job, error, worker_backtrace) ⇒ Object
-
#initialize(reserver, options = {}) ⇒ BaseWorker
constructor
A new instance of BaseWorker.
-
#jobs ⇒ Object
Return an enumerator to each of the jobs provided by the reserver.
- #listen_for_lost_lock ⇒ Object
- #log_level ⇒ Object
- #on_current_job_lock_lost(&block) ⇒ Object
-
#pause(in_signal_handler = true) ⇒ Object
Pause the worker – take no more new jobs.
-
#perform(job) ⇒ Object
Actually perform the job.
-
#procline(value, in_signal_handler = true) ⇒ Object
Set the procline.
-
#register_signal_handlers ⇒ Object
The meaning of these signals is meant to closely mirror resque.
- #safe_trap(signal_name, &cblock) ⇒ Object
-
#shutdown(in_signal_handler = true) ⇒ Object
(also: #stop!)
Stop processing after this job.
-
#try_complete(job) ⇒ Object
Complete the job unless the worker has already put it into another state by completing / failing / etc.
- #uniq_clients ⇒ Object
-
#unpause(in_signal_handler = true) ⇒ Object
Continue taking new jobs.
Methods included from SupportsMiddlewareModules
Constructor Details
#initialize(reserver, options = {}) ⇒ BaseWorker
Returns a new instance of BaseWorker.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/qless/worker/base.rb', line 20 def initialize(reserver, = {}) # Our job reserver and options @reserver = reserver @options = # SIGHUP handler @sighup_handler = .fetch(:sighup_handler) { lambda { } } # Our logger @log = .fetch(:logger) do @output = .fetch(:output, $stdout) Logger.new(output).tap do |logger| logger.level = .fetch(:log_level, Logger::WARN) logger.formatter = .fetch(:log_formatter) do Proc.new { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" } end end end # The interval for checking for new jobs @interval = .fetch(:interval, 5.0) @current_job_mutex = Mutex.new @current_job = nil # Default behavior when a lock is lost: stop after the current job. on_current_job_lock_lost { shutdown(in_signal_handler=false) } end |
Instance Attribute Details
#interval ⇒ Object
Returns the value of attribute interval.
17 18 19 |
# File 'lib/qless/worker/base.rb', line 17 def interval @interval end |
#options ⇒ Object
Returns the value of attribute options.
17 18 19 |
# File 'lib/qless/worker/base.rb', line 17 def @options end |
#output ⇒ Object
Returns the value of attribute output.
17 18 19 |
# File 'lib/qless/worker/base.rb', line 17 def output @output end |
#paused ⇒ Object
Returns the value of attribute paused.
17 18 19 |
# File 'lib/qless/worker/base.rb', line 17 def paused @paused end |
#reserver ⇒ Object
Returns the value of attribute reserver.
17 18 19 |
# File 'lib/qless/worker/base.rb', line 17 def reserver @reserver end |
#sighup_handler ⇒ Object
Returns the value of attribute sighup_handler.
17 18 19 |
# File 'lib/qless/worker/base.rb', line 17 def sighup_handler @sighup_handler end |
Instance Method Details
#deregister ⇒ Object
188 189 190 191 192 |
# File 'lib/qless/worker/base.rb', line 188 def deregister uniq_clients.each do |client| client.deregister_workers(client.worker_name) end end |
#fail_job(job, error, worker_backtrace) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/qless/worker/base.rb', line 178 def fail_job(job, error, worker_backtrace) failure = Qless.failure_formatter.format(job, error, worker_backtrace) log(:error, "Got #{failure.group} failure from #{job.inspect}\n#{failure.}" ) job.fail(*failure) rescue Job::CantFailError => e # There's not much we can do here. Another worker may have cancelled it, # or we might not own the job, etc. Logging is the best we can do. log(:error, "Failed to fail #{job.inspect}: #{e.}") end |
#jobs ⇒ Object
Return an enumerator to each of the jobs provided by the reserver
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/qless/worker/base.rb', line 84 def jobs return Enumerator.new do |enum| loop do begin job = reserver.reserve rescue Exception => error # We want workers to durably stay up, so we don't want errors # during job reserving (e.g. network timeouts, etc) to kill the # worker. log(:error, "Error reserving job: #{error.class}: #{error.}") end # If we ended up getting a job, yield it. Otherwise, we wait if job.nil? no_job_available else self.current_job = job enum.yield(job) self.current_job = nil end break if @shutdown end end end |
#listen_for_lost_lock ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/qless/worker/base.rb', line 202 def listen_for_lost_lock subscribers = uniq_clients.map do |client| Subscriber.start(client, "ql:w:#{client.worker_name}", log: @log) do |_, | if ['event'] == 'lock_lost' with_current_job do |job| if job && ['jid'] == job.jid @on_current_job_lock_lost.call(job) end end end end end yield ensure subscribers.each(&:stop) end |
#log_level ⇒ Object
48 49 50 |
# File 'lib/qless/worker/base.rb', line 48 def log_level @log.level end |
#on_current_job_lock_lost(&block) ⇒ Object
198 199 200 |
# File 'lib/qless/worker/base.rb', line 198 def on_current_job_lock_lost(&block) @on_current_job_lock_lost = block end |
#pause(in_signal_handler = true) ⇒ Object
Pause the worker – take no more new jobs
147 148 149 150 |
# File 'lib/qless/worker/base.rb', line 147 def pause(in_signal_handler=true) @paused = true procline("Paused -- #{reserver.description}", in_signal_handler=in_signal_handler) end |
#perform(job) ⇒ Object
Actually perform the job
112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/qless/worker/base.rb', line 112 def perform(job) start_time = Time.now.to_f around_perform(job) rescue JobLockLost log(:warn, "Lost lock for job #{job.jid}") rescue Exception => error fail_job(job, error, caller) else try_complete(job) ensure elapsed_time = Time.now.to_f - start_time log(:info, "Job #{job.description} took #{elapsed_time} seconds") end |
#procline(value, in_signal_handler = true) ⇒ Object
Set the procline. Not supported on all systems
158 159 160 161 |
# File 'lib/qless/worker/base.rb', line 158 def procline(value, in_signal_handler=true) $0 = "Qless-#{Qless::VERSION}: #{value} at #{Time.now.iso8601}" log(:debug, $PROGRAM_NAME) unless in_signal_handler end |
#register_signal_handlers ⇒ Object
The meaning of these signals is meant to closely mirror resque
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked children immediately, continue processing jobs. USR2: Pause after this job CONT: Start processing jobs again after a USR2
HUP: Print current stack to log and continue
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/qless/worker/base.rb', line 69 def register_signal_handlers # Otherwise, we want to take the appropriate action trap('TERM') { exit! } trap('INT') { exit! } safe_trap('HUP') { sighup_handler.call } safe_trap('QUIT') { shutdown(in_signal_handler=true) } begin trap('CONT') { unpause(in_signal_handler=true) } trap('USR2') { pause(in_signal_handler=true) } rescue ArgumentError warn 'Signals USR2, and/or CONT not supported.' end end |
#safe_trap(signal_name, &cblock) ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/qless/worker/base.rb', line 52 def safe_trap(signal_name, &cblock) begin trap(signal_name, cblock) rescue ArgumentError warn "Signal #{signal_name} not supported." end end |
#shutdown(in_signal_handler = true) ⇒ Object Also known as: stop!
Stop processing after this job
141 142 143 |
# File 'lib/qless/worker/base.rb', line 141 def shutdown(in_signal_handler=true) @shutdown = true end |
#try_complete(job) ⇒ Object
Complete the job unless the worker has already put it into another state by completing / failing / etc. the job
165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/qless/worker/base.rb', line 165 def try_complete(job) job.complete unless job.state_changed? rescue Job::CantCompleteError => e # There's not much we can do here. Complete fails in a few cases: # - The job is already failed (i.e. by another worker) # - The job is being worked on by another worker # - The job has been cancelled # # We don't want to (or are able to) fail the job with this error in # any of these cases, so the best we can do is log the failure. log(:warn, "Failed to complete #{job.inspect}: #{e.}") end |
#uniq_clients ⇒ Object
194 195 196 |
# File 'lib/qless/worker/base.rb', line 194 def uniq_clients @uniq_clients ||= reserver.queues.map(&:client).uniq end |
#unpause(in_signal_handler = true) ⇒ Object
Continue taking new jobs
153 154 155 |
# File 'lib/qless/worker/base.rb', line 153 def unpause(in_signal_handler=true) @paused = false end |