Class: Reqless::Workers::BaseWorker
- Inherits:
-
Object
- Object
- Reqless::Workers::BaseWorker
- Includes:
- SupportsMiddlewareModules
- Defined in:
- lib/reqless/worker/base.rb
Direct Known Subclasses
Defined Under Namespace
Modules: SupportsMiddlewareModules
Instance Attribute Summary collapse
-
#interval ⇒ Object
Returns the value of attribute interval.
-
#options ⇒ Object
Returns the value of attribute options.
-
#output ⇒ Object
Returns the value of attribute output.
-
#paused ⇒ Object
Returns the value of attribute paused.
-
#reserver ⇒ Object
Returns the value of attribute reserver.
-
#sighup_handler ⇒ Object
Returns the value of attribute sighup_handler.
Instance Method Summary collapse
- #deregister ⇒ Object
- #fail_job(job, error, worker_backtrace) ⇒ Object
-
#initialize(reserver, options = {}) ⇒ BaseWorker
constructor
A new instance of BaseWorker.
-
#jobs ⇒ Object
Return an enumerator to each of the jobs provided by the reserver.
- #listen_for_lost_lock(job) ⇒ Object
- #log_level ⇒ Object
- #on_current_job_lock_lost(&block) ⇒ Object
-
#pause ⇒ Object
Pause the worker – take no more new jobs.
-
#perform(job) ⇒ Object
Actually perform the job.
-
#procline(value) ⇒ Object
Set the proceline.
-
#register_signal_handlers ⇒ Object
The meaning of these signals is meant to closely mirror resque.
- #safe_trap(signal_name, &cblock) ⇒ Object
-
#shutdown ⇒ Object
(also: #stop!)
Stop processing after this job.
-
#try_complete(job) ⇒ Object
Complete the job unless the worker has already put it into another state by completing / failing / etc.
- #uniq_clients ⇒ Object
-
#unpause ⇒ Object
Continue taking new jobs.
Methods included from SupportsMiddlewareModules
Constructor Details
#initialize(reserver, options = {}) ⇒ BaseWorker
Returns a new instance of BaseWorker.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/reqless/worker/base.rb', line 19 def initialize(reserver, = {}) # Our job reserver and options @reserver = reserver @options = # SIGHUP handler @sighup_handler = .fetch(:sighup_handler) { lambda { } } # Our logger @log = .fetch(:logger) do @output = .fetch(:output, $stdout) Logger.new(output).tap do |logger| logger.level = .fetch(:log_level, Logger::WARN) logger.formatter = .fetch(:log_formatter) do Proc.new { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" } end end end # The interval for checking for new jobs @interval = .fetch(:interval, 5.0) @current_job_mutex = Mutex.new @current_job = nil # Default behavior when a lock is lost: stop after the current job. on_current_job_lock_lost { shutdown } end |
Instance Attribute Details
#interval ⇒ Object
Returns the value of attribute interval.
16 17 18 |
# File 'lib/reqless/worker/base.rb', line 16 def interval @interval end |
#options ⇒ Object
Returns the value of attribute options.
16 17 18 |
# File 'lib/reqless/worker/base.rb', line 16 def @options end |
#output ⇒ Object
Returns the value of attribute output.
16 17 18 |
# File 'lib/reqless/worker/base.rb', line 16 def output @output end |
#paused ⇒ Object
Returns the value of attribute paused.
16 17 18 |
# File 'lib/reqless/worker/base.rb', line 16 def paused @paused end |
#reserver ⇒ Object
Returns the value of attribute reserver.
16 17 18 |
# File 'lib/reqless/worker/base.rb', line 16 def reserver @reserver end |
#sighup_handler ⇒ Object
Returns the value of attribute sighup_handler.
16 17 18 |
# File 'lib/reqless/worker/base.rb', line 16 def sighup_handler @sighup_handler end |
Instance Method Details
#deregister ⇒ Object
183 184 185 186 187 |
# File 'lib/reqless/worker/base.rb', line 183 def deregister uniq_clients.each do |client| client.deregister_workers(client.worker_name) end end |
#fail_job(job, error, worker_backtrace) ⇒ Object
173 174 175 176 177 178 179 180 181 |
# File 'lib/reqless/worker/base.rb', line 173 def fail_job(job, error, worker_backtrace) failure = Reqless.failure_formatter.format(job, error, worker_backtrace) log(:error, "Got #{failure.group} failure from #{job.inspect}\n#{failure.}" ) job.fail(*failure) rescue Job::CantFailError => e # There's not much we can do here. Another worker may have cancelled it, # or we might not own the job, etc. Logging is the best we can do. log(:error, "Failed to fail #{job.inspect}: #{e.}") end |
#jobs ⇒ Object
Return an enumerator to each of the jobs provided by the reserver
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/reqless/worker/base.rb', line 83 def jobs return Enumerator.new do |enum| loop do begin job = reserver.reserve rescue Exception => error # We want workers to durably stay up, so we don't want errors # during job reserving (e.g. network timeouts, etc) to kill the # worker. log(:error, "Error reserving job: #{error.class}: #{error.}") end # If we ended up getting a job, yield it. Otherwise, we wait if job.nil? no_job_available else self.current_job = job enum.yield(job) self.current_job = nil end break if @shutdown end end end |
#listen_for_lost_lock(job) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/reqless/worker/base.rb', line 197 def listen_for_lost_lock(job) # Ensure subscribers always has a value subscriber = Subscriber.start(job.client, "ql:w:#{job.client.worker_name}", log: @log) do |_, | if ['event'] == 'lock_lost' && ['jid'] == job.jid @on_current_job_lock_lost.call(job) end end yield ensure subscriber && subscriber.stop end |
#log_level ⇒ Object
47 48 49 |
# File 'lib/reqless/worker/base.rb', line 47 def log_level @log.level end |
#on_current_job_lock_lost(&block) ⇒ Object
193 194 195 |
# File 'lib/reqless/worker/base.rb', line 193 def on_current_job_lock_lost(&block) @on_current_job_lock_lost = block end |
#pause ⇒ Object
Pause the worker – take no more new jobs
142 143 144 145 |
# File 'lib/reqless/worker/base.rb', line 142 def pause @paused = true procline "Paused -- #{reserver.description}" end |
#perform(job) ⇒ Object
Actually perform the job
111 112 113 114 115 116 117 118 119 |
# File 'lib/reqless/worker/base.rb', line 111 def perform(job) around_perform(job) rescue JobLockLost log(:warn, "Lost lock for job #{job.jid}") rescue Exception => error fail_job(job, error, caller) else try_complete(job) end |
#procline(value) ⇒ Object
Set the proceline. Not supported on all systems
153 154 155 156 |
# File 'lib/reqless/worker/base.rb', line 153 def procline(value) $0 = "reQless-#{Reqless::VERSION}: #{value} at #{Time.now.iso8601}" log(:debug, $PROGRAM_NAME) end |
#register_signal_handlers ⇒ Object
The meaning of these signals is meant to closely mirror resque
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked children immediately, continue processing jobs. USR2: Pause after this job CONT: Start processing jobs again after a USR2
HUP: Print current stack to log and continue
68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/reqless/worker/base.rb', line 68 def register_signal_handlers # Otherwise, we want to take the appropriate action trap('TERM') { exit! } trap('INT') { exit! } safe_trap('HUP') { sighup_handler.call } safe_trap('QUIT') { shutdown } begin trap('CONT') { unpause } trap('USR2') { pause } rescue ArgumentError warn 'Signals USR2, and/or CONT not supported.' end end |
#safe_trap(signal_name, &cblock) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/reqless/worker/base.rb', line 51 def safe_trap(signal_name, &cblock) begin trap(signal_name, cblock) rescue ArgumentError warn "Signal #{signal_name} not supported." end end |
#shutdown ⇒ Object Also known as: stop!
Stop processing after this job
136 137 138 |
# File 'lib/reqless/worker/base.rb', line 136 def shutdown @shutdown = true end |
#try_complete(job) ⇒ Object
Complete the job unless the worker has already put it into another state by completing / failing / etc. the job
160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/reqless/worker/base.rb', line 160 def try_complete(job) job.complete unless job.state_changed? rescue Job::CantCompleteError => e # There's not much we can do here. Complete fails in a few cases: # - The job is already failed (i.e. by another worker) # - The job is being worked on by another worker # - The job has been cancelled # # We don't want to (or are able to) fail the job with this error in # any of these cases, so the best we can do is log the failure. log(:error, "Failed to complete #{job.inspect}: #{e.}") end |
#uniq_clients ⇒ Object
189 190 191 |
# File 'lib/reqless/worker/base.rb', line 189 def uniq_clients @uniq_clients ||= reserver.queues.map(&:client).uniq end |
#unpause ⇒ Object
Continue taking new jobs
148 149 150 |
# File 'lib/reqless/worker/base.rb', line 148 def unpause @paused = false end |