Class: Delayed::Worker
- Inherits:
-
Object
- Object
- Delayed::Worker
- Defined in:
- lib/delayed/worker.rb
Constant Summary collapse
- LINUX_PAGE_SIZE =
linux w/ proc fs
(size = `getconf PAGESIZE`.to_i; size > 0 ? size : 4096)
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#max_priority ⇒ Object
readonly
Returns the value of attribute max_priority.
-
#min_priority ⇒ Object
readonly
Returns the value of attribute min_priority.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#work_queue ⇒ Object
readonly
Returns the value of attribute work_queue.
Class Method Summary collapse
- .current_job ⇒ Object
- .lifecycle ⇒ Object
-
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times.
- .running_job(job) ⇒ Object
Instance Method Summary collapse
-
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
- #exit? ⇒ Boolean
- #handle_failed_job(job, error) ⇒ Object
- #id ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #log_job(job, format = :short) ⇒ Object
- #name ⇒ Object
- #parent_exited? ⇒ Boolean
- #perform(job) ⇒ Object
- #perform_batch(parent_job) ⇒ Object
- #run ⇒ Object
-
#sample_memory ⇒ Object
generic unix solution.
- #say(msg, level = :debug) ⇒ Object
- #set_process_name(new_name) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/delayed/worker.rb', line 42 def initialize( = {}) @exit = false @parent_pid = [:parent_pid] @queue_name = [:queue] ||= Settings.queue @min_priority = [:min_priority] @max_priority = [:max_priority] @max_job_count = [:worker_max_job_count].to_i @max_memory_usage = [:worker_max_memory_usage].to_i @work_queue = .delete(:work_queue) || WorkQueue::InProcess.new @config = @job_count = 0 app = Rails.application if app && !app.config.cache_classes Delayed::Worker.lifecycle.around(:perform) do |&block| reload = app.config.reload_classes_only_on_change != true || app.reloaders.map(&:updated?).any? ActionDispatch::Reloader.prepare! if reload begin block.call ensure ActionDispatch::Reloader.cleanup! if reload end end end plugins.each { |plugin| plugin.inject! } end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def config @config end |
#max_priority ⇒ Object (readonly)
Returns the value of attribute max_priority.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def max_priority @max_priority end |
#min_priority ⇒ Object (readonly)
Returns the value of attribute min_priority.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def min_priority @min_priority end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def queue_name @queue_name end |
#work_queue ⇒ Object (readonly)
Returns the value of attribute work_queue.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def work_queue @work_queue end |
Class Method Details
.current_job ⇒ Object
31 32 33 |
# File 'lib/delayed/worker.rb', line 31 def self.current_job Thread.current[:running_delayed_job] end |
.lifecycle ⇒ Object
27 28 29 |
# File 'lib/delayed/worker.rb', line 27 def self.lifecycle @lifecycle ||= Delayed::Lifecycle.new end |
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times. If this callback is defined, then the value of destroy_failed_jobs is ignored, and the job is destroyed if this block returns true.
This allows for destroying “uninteresting” failures, while keeping around interesting failures to be investigated later.
The block is called with args(job, last_exception)
19 20 21 |
# File 'lib/delayed/worker.rb', line 19 def self.on_max_failures=(block) @@on_max_failures = block end |
.running_job(job) ⇒ Object
35 36 37 38 39 40 |
# File 'lib/delayed/worker.rb', line 35 def self.running_job(job) Thread.current[:running_delayed_job] = job yield ensure Thread.current[:running_delayed_job] = nil end |
Instance Method Details
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/delayed/worker.rb', line 205 def configure_for_job(job) previous_tmpdir = ENV['TMPDIR'] self.class.running_job(job) do dir = Dir.mktmpdir("job-#{job.id}-#{self.name.gsub(/[^\w\.]/, '.')}-") begin ENV['TMPDIR'] = dir yield ensure FileUtils.remove_entry(dir, true) end end ensure ENV['TMPDIR'] = previous_tmpdir end |
#exit? ⇒ Boolean
78 79 80 |
# File 'lib/delayed/worker.rb', line 78 def exit? @exit || parent_exited? end |
#handle_failed_job(job, error) ⇒ Object
180 181 182 183 184 |
# File 'lib/delayed/worker.rb', line 180 def handle_failed_job(job, error) job.last_error = "#{error.}\n#{error.backtrace.join("\n")}" say("Failed with #{error.class} [#{error.}] (#{job.attempts} attempts)", :error) job.reschedule(error) end |
#id ⇒ Object
186 187 188 |
# File 'lib/delayed/worker.rb', line 186 def id Process.pid end |
#log_job(job, format = :short) ⇒ Object
194 195 196 197 198 199 200 201 |
# File 'lib/delayed/worker.rb', line 194 def log_job(job, format = :short) case format when :long "#{job.full_name} #{ job.to_json(:include_root => false, :only => %w(tag strand priority attempts created_at max_attempts source)) }" else job.full_name end end |
#name ⇒ Object
70 71 72 |
# File 'lib/delayed/worker.rb', line 70 def name @name ||= "#{Socket.gethostname rescue "X"}:#{self.id}" end |
#parent_exited? ⇒ Boolean
82 83 84 |
# File 'lib/delayed/worker.rb', line 82 def parent_exited? @parent_pid && @parent_pid != Process.ppid end |
#perform(job) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/delayed/worker.rb', line 140 def perform(job) count = 1 raise Delayed::Backend::JobExpired, "job expired at #{job.expires_at}" if job.expired? self.class.lifecycle.run_callbacks(:perform, self, job) do set_process_name("run:#{Settings.worker_procname_prefix}#{job.id}:#{job.name}") say("Processing #{log_job(job, :long)}", :info) runtime = Benchmark.realtime do if job.batch? # each job in the batch will have perform called on it, so we don't # need a timeout around this count = perform_batch(job) else job.invoke_job end job.destroy end say("Completed #{log_job(job)} #{"%.0fms" % (runtime * 1000)}", :info) end count rescue Exception => e self.class.lifecycle.run_callbacks(:error, self, job, e) do handle_failed_job(job, e) end count end |
#perform_batch(parent_job) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/delayed/worker.rb', line 166 def perform_batch(parent_job) batch = parent_job.payload_object if batch.mode == :serial batch.jobs.each do |job| job.source = parent_job.source job.create_and_lock!(name) configure_for_job(job) do perform(job) end end batch.items.size end end |
#run ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/delayed/worker.rb', line 107 def run self.class.lifecycle.run_callbacks(:loop, self) do set_process_name("pop:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}") job = self.class.lifecycle.run_callbacks(:pop, self) do work_queue.get_and_lock_next_available(name, config) end if job configure_for_job(job) do @job_count += perform(job) if @max_job_count > 0 && @job_count >= @max_job_count say "Max job count of #{@max_job_count} exceeded, dying" @exit = true end if @max_memory_usage > 0 memory = sample_memory if memory > @max_memory_usage say "Memory usage of #{memory} exceeds max of #{@max_memory_usage}, dying" @exit = true else say "Memory usage: #{memory}" end end end else set_process_name("wait:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}") sleep(Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)) end end end |
#sample_memory ⇒ Object
generic unix solution
231 232 233 234 |
# File 'lib/delayed/worker.rb', line 231 def sample_memory s = File.read("/proc/#{Process.pid}/statm").to_i rescue 0 s * LINUX_PAGE_SIZE / 1024 end |
#say(msg, level = :debug) ⇒ Object
190 191 192 |
# File 'lib/delayed/worker.rb', line 190 def say(msg, level = :debug) Rails.logger.send(level, msg) end |
#set_process_name(new_name) ⇒ Object
74 75 76 |
# File 'lib/delayed/worker.rb', line 74 def set_process_name(new_name) $0 = "delayed:#{new_name}" end |
#start ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/delayed/worker.rb', line 86 def start say "Starting worker", :info set_process_name("start:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}") trap('INT') { say 'Exiting'; @exit = true } self.class.lifecycle.run_callbacks(:execute, self) do loop do run break if exit? end end say "Stopping worker", :info rescue => e Rails.logger.fatal("Child process died: #{e.inspect}") rescue nil self.class.lifecycle.run_callbacks(:exceptional_exit, self, e) { } ensure Delayed::Job.clear_locks!(name) end |