Class: Delayed::Worker
- Inherits:
-
Object
- Object
- Delayed::Worker
- Defined in:
- lib/delayed/worker.rb
Constant Summary collapse
- LINUX_PAGE_SIZE =
linux w/ proc fs
(size = `getconf PAGESIZE`.to_i; size > 0 ? size : 4096)
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#max_priority ⇒ Object
readonly
Returns the value of attribute max_priority.
-
#min_priority ⇒ Object
readonly
Returns the value of attribute min_priority.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Class Method Summary collapse
- .current_job ⇒ Object
- .lifecycle ⇒ Object
-
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times.
Instance Method Summary collapse
-
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
- #exit? ⇒ Boolean
- #handle_failed_job(job, error) ⇒ Object
- #id ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #log_job(job, format = :short) ⇒ Object
- #name ⇒ Object
- #name=(name) ⇒ Object
- #parent_exited? ⇒ Boolean
- #perform(job) ⇒ Object
- #perform_batch(parent_job) ⇒ Object
- #run ⇒ Object
-
#sample_memory ⇒ Object
generic unix solution.
- #say(msg, level = :debug) ⇒ Object
- #set_process_name(new_name) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/delayed/worker.rb', line 31 def initialize( = {}) @exit = false @config = @parent_pid = [:parent_pid] @queue = [:queue] || Settings.queue @min_priority = [:min_priority] @max_priority = [:max_priority] @max_job_count = [:worker_max_job_count].to_i @max_memory_usage = [:worker_max_memory_usage].to_i @job_count = 0 app = Rails.application if app && !app.config.cache_classes Delayed::Worker.lifecycle.around(:perform) do |&block| reload = app.config.reload_classes_only_on_change != true || app.reloaders.map(&:updated?).any? ActionDispatch::Reloader.prepare! if reload begin block.call ensure ActionDispatch::Reloader.cleanup! if reload end end end plugins.each { |plugin| plugin.inject! } end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def config @config end |
#max_priority ⇒ Object (readonly)
Returns the value of attribute max_priority.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def max_priority @max_priority end |
#min_priority ⇒ Object (readonly)
Returns the value of attribute min_priority.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def min_priority @min_priority end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def queue @queue end |
Class Method Details
.current_job ⇒ Object
212 213 214 |
# File 'lib/delayed/worker.rb', line 212 def self.current_job Thread.current[:running_delayed_job] end |
.lifecycle ⇒ Object
27 28 29 |
# File 'lib/delayed/worker.rb', line 27 def self.lifecycle @lifecycle ||= Delayed::Lifecycle.new end |
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times. If this callback is defined, then the value of destroy_failed_jobs is ignored, and the job is destroyed if this block returns true.
This allows for destroying “uninteresting” failures, while keeping around interesting failures to be investigated later.
The block is called with args(job, last_exception)
19 20 21 |
# File 'lib/delayed/worker.rb', line 19 def self.on_max_failures=(block) @@on_max_failures = block end |
Instance Method Details
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/delayed/worker.rb', line 196 def configure_for_job(job) previous_tmpdir = ENV['TMPDIR'] Thread.current[:running_delayed_job] = job dir = Dir.mktmpdir("job-#{job.id}-#{self.name.gsub(/[^\w\.]/, '.')}-") begin ENV['TMPDIR'] = dir yield ensure FileUtils.remove_entry(dir, true) end ensure ENV['TMPDIR'] = previous_tmpdir Thread.current[:running_delayed_job] = nil end |
#exit? ⇒ Boolean
70 71 72 |
# File 'lib/delayed/worker.rb', line 70 def exit? @exit || parent_exited? end |
#handle_failed_job(job, error) ⇒ Object
171 172 173 174 175 |
# File 'lib/delayed/worker.rb', line 171 def handle_failed_job(job, error) job.last_error = "#{error.}\n#{error.backtrace.join("\n")}" say("Failed with #{error.class} [#{error.}] (#{job.attempts} attempts)", :error) job.reschedule(error) end |
#id ⇒ Object
177 178 179 |
# File 'lib/delayed/worker.rb', line 177 def id Process.pid end |
#log_job(job, format = :short) ⇒ Object
185 186 187 188 189 190 191 192 |
# File 'lib/delayed/worker.rb', line 185 def log_job(job, format = :short) case format when :long "#{job.full_name} #{ job.to_json(:include_root => false, :only => %w(tag strand priority attempts created_at max_attempts source)) }" else job.full_name end end |
#name ⇒ Object
62 63 64 |
# File 'lib/delayed/worker.rb', line 62 def name @name ||= "#{Socket.gethostname rescue "X"}:#{self.id}" end |
#name=(name) ⇒ Object
58 59 60 |
# File 'lib/delayed/worker.rb', line 58 def name=(name) @name = name end |
#parent_exited? ⇒ Boolean
74 75 76 |
# File 'lib/delayed/worker.rb', line 74 def parent_exited? @parent_pid && @parent_pid != Process.ppid end |
#perform(job) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/delayed/worker.rb', line 131 def perform(job) count = 1 raise Delayed::Backend::JobExpired, "job expired at #{job.expires_at}" if job.expired? self.class.lifecycle.run_callbacks(:perform, self, job) do set_process_name("run:#{Settings.worker_procname_prefix}#{job.id}:#{job.name}") say("Processing #{log_job(job, :long)}", :info) runtime = Benchmark.realtime do if job.batch? # each job in the batch will have perform called on it, so we don't # need a timeout around this count = perform_batch(job) else job.invoke_job end job.destroy end say("Completed #{log_job(job)} #{"%.0fms" % (runtime * 1000)}", :info) end count rescue Exception => e self.class.lifecycle.run_callbacks(:error, self, job, e) do handle_failed_job(job, e) end count end |
#perform_batch(parent_job) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/delayed/worker.rb', line 157 def perform_batch(parent_job) batch = parent_job.payload_object if batch.mode == :serial batch.jobs.each do |job| job.source = parent_job.source job.create_and_lock!(name) configure_for_job(job) do perform(job) end end batch.items.size end end |
#run ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/delayed/worker.rb', line 96 def run job = self.class.lifecycle.run_callbacks(:pop, self) do Delayed::Job.get_and_lock_next_available( name, queue, min_priority, max_priority) end if job configure_for_job(job) do @job_count += perform(job) if @max_job_count > 0 && @job_count >= @max_job_count say "Max job count of #{@max_job_count} exceeded, dying" @exit = true end if @max_memory_usage > 0 memory = sample_memory if memory > @max_memory_usage say "Memory usage of #{memory} exceeds max of #{@max_memory_usage}, dying" @exit = true else say "Memory usage: #{memory}" end end end else set_process_name("wait:#{Settings.worker_procname_prefix}#{@queue}:#{min_priority || 0}:#{max_priority || 'max'}") sleep(Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)) end end |
#sample_memory ⇒ Object
generic unix solution
226 227 228 229 |
# File 'lib/delayed/worker.rb', line 226 def sample_memory s = File.read("/proc/#{Process.pid}/statm").to_i rescue 0 s * LINUX_PAGE_SIZE / 1024 end |
#say(msg, level = :debug) ⇒ Object
181 182 183 |
# File 'lib/delayed/worker.rb', line 181 def say(msg, level = :debug) Rails.logger.send(level, msg) end |
#set_process_name(new_name) ⇒ Object
66 67 68 |
# File 'lib/delayed/worker.rb', line 66 def set_process_name(new_name) $0 = "delayed:#{new_name}" end |
#start ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/delayed/worker.rb', line 78 def start say "Starting worker", :info trap('INT') { say 'Exiting'; @exit = true } loop do run break if exit? end say "Stopping worker", :info rescue => e Rails.logger.fatal("Child process died: #{e.inspect}") rescue nil self.class.lifecycle.run_callbacks(:exceptional_exit, self, e) { } ensure Delayed::Job.clear_locks!(name) end |