Class: Delayed::Worker
- Inherits:
-
Object
- Object
- Delayed::Worker
- Defined in:
- lib/delayed/worker.rb
Constant Summary collapse
- LINUX_PAGE_SIZE =
linux w/ proc fs
(size = `getconf PAGESIZE`.to_i; size > 0 ? size : 4096)
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#max_priority ⇒ Object
readonly
Returns the value of attribute max_priority.
-
#min_priority ⇒ Object
readonly
Returns the value of attribute min_priority.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#work_queue ⇒ Object
readonly
Returns the value of attribute work_queue.
Class Method Summary collapse
- .current_job ⇒ Object
- .lifecycle ⇒ Object
-
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times.
Instance Method Summary collapse
-
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
- #exit? ⇒ Boolean
- #handle_failed_job(job, error) ⇒ Object
- #id ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #log_job(job, format = :short) ⇒ Object
- #name ⇒ Object
- #parent_exited? ⇒ Boolean
- #perform(job) ⇒ Object
- #perform_batch(parent_job) ⇒ Object
- #run ⇒ Object
-
#sample_memory ⇒ Object
generic unix solution.
- #say(msg, level = :debug) ⇒ Object
- #set_process_name(new_name) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/delayed/worker.rb', line 31 def initialize( = {}) @exit = false @config = @parent_pid = [:parent_pid] @queue_name = [:queue] || Settings.queue @min_priority = [:min_priority] @max_priority = [:max_priority] @max_job_count = [:worker_max_job_count].to_i @max_memory_usage = [:worker_max_memory_usage].to_i @work_queue = [:work_queue] || WorkQueue::InProcess.new @job_count = 0 app = Rails.application if app && !app.config.cache_classes Delayed::Worker.lifecycle.around(:perform) do |&block| reload = app.config.reload_classes_only_on_change != true || app.reloaders.map(&:updated?).any? ActionDispatch::Reloader.prepare! if reload begin block.call ensure ActionDispatch::Reloader.cleanup! if reload end end end plugins.each { |plugin| plugin.inject! } end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def config @config end |
#max_priority ⇒ Object (readonly)
Returns the value of attribute max_priority.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def max_priority @max_priority end |
#min_priority ⇒ Object (readonly)
Returns the value of attribute min_priority.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def min_priority @min_priority end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def queue_name @queue_name end |
#work_queue ⇒ Object (readonly)
Returns the value of attribute work_queue.
9 10 11 |
# File 'lib/delayed/worker.rb', line 9 def work_queue @work_queue end |
Class Method Details
.current_job ⇒ Object
208 209 210 |
# File 'lib/delayed/worker.rb', line 208 def self.current_job Thread.current[:running_delayed_job] end |
.lifecycle ⇒ Object
27 28 29 |
# File 'lib/delayed/worker.rb', line 27 def self.lifecycle @lifecycle ||= Delayed::Lifecycle.new end |
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times. If this callback is defined, then the value of destroy_failed_jobs is ignored, and the job is destroyed if this block returns true.
This allows for destroying “uninteresting” failures, while keeping around interesting failures to be investigated later.
The block is called with args(job, last_exception)
19 20 21 |
# File 'lib/delayed/worker.rb', line 19 def self.on_max_failures=(block) @@on_max_failures = block end |
Instance Method Details
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/delayed/worker.rb', line 192 def configure_for_job(job) previous_tmpdir = ENV['TMPDIR'] Thread.current[:running_delayed_job] = job dir = Dir.mktmpdir("job-#{job.id}-#{self.name.gsub(/[^\w\.]/, '.')}-") begin ENV['TMPDIR'] = dir yield ensure FileUtils.remove_entry(dir, true) end ensure ENV['TMPDIR'] = previous_tmpdir Thread.current[:running_delayed_job] = nil end |
#exit? ⇒ Boolean
67 68 69 |
# File 'lib/delayed/worker.rb', line 67 def exit? @exit || parent_exited? end |
#handle_failed_job(job, error) ⇒ Object
167 168 169 170 171 |
# File 'lib/delayed/worker.rb', line 167 def handle_failed_job(job, error) job.last_error = "#{error.}\n#{error.backtrace.join("\n")}" say("Failed with #{error.class} [#{error.}] (#{job.attempts} attempts)", :error) job.reschedule(error) end |
#id ⇒ Object
173 174 175 |
# File 'lib/delayed/worker.rb', line 173 def id Process.pid end |
#log_job(job, format = :short) ⇒ Object
181 182 183 184 185 186 187 188 |
# File 'lib/delayed/worker.rb', line 181 def log_job(job, format = :short) case format when :long "#{job.full_name} #{ job.to_json(:include_root => false, :only => %w(tag strand priority attempts created_at max_attempts source)) }" else job.full_name end end |
#name ⇒ Object
59 60 61 |
# File 'lib/delayed/worker.rb', line 59 def name @name ||= "#{Socket.gethostname rescue "X"}:#{self.id}" end |
#parent_exited? ⇒ Boolean
71 72 73 |
# File 'lib/delayed/worker.rb', line 71 def parent_exited? @parent_pid && @parent_pid != Process.ppid end |
#perform(job) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/delayed/worker.rb', line 127 def perform(job) count = 1 raise Delayed::Backend::JobExpired, "job expired at #{job.expires_at}" if job.expired? self.class.lifecycle.run_callbacks(:perform, self, job) do set_process_name("run:#{Settings.worker_procname_prefix}#{job.id}:#{job.name}") say("Processing #{log_job(job, :long)}", :info) runtime = Benchmark.realtime do if job.batch? # each job in the batch will have perform called on it, so we don't # need a timeout around this count = perform_batch(job) else job.invoke_job end job.destroy end say("Completed #{log_job(job)} #{"%.0fms" % (runtime * 1000)}", :info) end count rescue Exception => e self.class.lifecycle.run_callbacks(:error, self, job, e) do handle_failed_job(job, e) end count end |
#perform_batch(parent_job) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/delayed/worker.rb', line 153 def perform_batch(parent_job) batch = parent_job.payload_object if batch.mode == :serial batch.jobs.each do |job| job.source = parent_job.source job.create_and_lock!(name) configure_for_job(job) do perform(job) end end batch.items.size end end |
#run ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/delayed/worker.rb', line 95 def run self.class.lifecycle.run_callbacks(:loop, self) do job = self.class.lifecycle.run_callbacks(:pop, self) do work_queue.get_and_lock_next_available(name, queue_name, min_priority, max_priority) end if job configure_for_job(job) do @job_count += perform(job) if @max_job_count > 0 && @job_count >= @max_job_count say "Max job count of #{@max_job_count} exceeded, dying" @exit = true end if @max_memory_usage > 0 memory = sample_memory if memory > @max_memory_usage say "Memory usage of #{memory} exceeds max of #{@max_memory_usage}, dying" @exit = true else say "Memory usage: #{memory}" end end end else set_process_name("wait:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}") sleep(Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)) end end end |
#sample_memory ⇒ Object
generic unix solution
222 223 224 225 |
# File 'lib/delayed/worker.rb', line 222 def sample_memory s = File.read("/proc/#{Process.pid}/statm").to_i rescue 0 s * LINUX_PAGE_SIZE / 1024 end |
#say(msg, level = :debug) ⇒ Object
177 178 179 |
# File 'lib/delayed/worker.rb', line 177 def say(msg, level = :debug) Rails.logger.send(level, msg) end |
#set_process_name(new_name) ⇒ Object
63 64 65 |
# File 'lib/delayed/worker.rb', line 63 def set_process_name(new_name) $0 = "delayed:#{new_name}" end |
#start ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/delayed/worker.rb', line 75 def start say "Starting worker", :info trap('INT') { say 'Exiting'; @exit = true } self.class.lifecycle.run_callbacks(:execute, self) do loop do run break if exit? end end say "Stopping worker", :info rescue => e Rails.logger.fatal("Child process died: #{e.inspect}") rescue nil self.class.lifecycle.run_callbacks(:exceptional_exit, self, e) { } ensure Delayed::Job.clear_locks!(name) end |