Class: Delayed::Worker
- Inherits:
-
Object
- Object
- Delayed::Worker
- Defined in:
- lib/delayed/worker.rb
Constant Summary collapse
- LINUX_PAGE_SIZE =
linux w/ proc fs
(size = `getconf PAGESIZE`.to_i; size > 0 ? size : 4096)
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#max_priority ⇒ Object
readonly
Returns the value of attribute max_priority.
-
#min_priority ⇒ Object
readonly
Returns the value of attribute min_priority.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Class Method Summary collapse
- .current_job ⇒ Object
- .lifecycle ⇒ Object
-
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times.
Instance Method Summary collapse
-
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
- #exit? ⇒ Boolean
- #handle_failed_job(job, error) ⇒ Object
- #id ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #log_job(job, format = :short) ⇒ Object
- #name ⇒ Object
- #name=(name) ⇒ Object
- #parent_exited? ⇒ Boolean
- #perform(job) ⇒ Object
- #perform_batch(parent_job) ⇒ Object
- #run ⇒ Object
-
#sample_memory ⇒ Object
generic unix solution.
- #say(msg, level = :debug) ⇒ Object
- #set_process_name(new_name) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/delayed/worker.rb', line 27 def initialize( = {}) @exit = false @config = @parent_pid = [:parent_pid] @queue = [:queue] || Settings.queue @min_priority = [:min_priority] @max_priority = [:max_priority] @max_job_count = [:worker_max_job_count].to_i @max_memory_usage = [:worker_max_memory_usage].to_i @job_count = 0 app = Rails.application if app && !app.config.cache_classes Delayed::Worker.lifecycle.around(:perform) do |&block| reload = app.config.reload_classes_only_on_change != true || app.reloaders.map(&:updated?).any? ActionDispatch::Reloader.prepare! if reload begin block.call ensure ActionDispatch::Reloader.cleanup! if reload end end end end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
8 9 10 |
# File 'lib/delayed/worker.rb', line 8 def config @config end |
#max_priority ⇒ Object (readonly)
Returns the value of attribute max_priority.
8 9 10 |
# File 'lib/delayed/worker.rb', line 8 def max_priority @max_priority end |
#min_priority ⇒ Object (readonly)
Returns the value of attribute min_priority.
8 9 10 |
# File 'lib/delayed/worker.rb', line 8 def min_priority @min_priority end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/delayed/worker.rb', line 8 def queue @queue end |
Class Method Details
.current_job ⇒ Object
203 204 205 |
# File 'lib/delayed/worker.rb', line 203 def self.current_job Thread.current[:running_delayed_job] end |
.lifecycle ⇒ Object
23 24 25 |
# File 'lib/delayed/worker.rb', line 23 def self.lifecycle @lifecycle ||= Delayed::Lifecycle.new end |
.on_max_failures=(block) ⇒ Object
Callback to fire when a delayed job fails max_attempts times. If this callback is defined, then the value of destroy_failed_jobs is ignored, and the job is destroyed if this block returns true.
This allows for destroying “uninteresting” failures, while keeping around interesting failures to be investigated later.
The block is called with args(job, last_exception)
18 19 20 |
# File 'lib/delayed/worker.rb', line 18 def self.on_max_failures=(block) @@on_max_failures = block end |
Instance Method Details
#configure_for_job(job) ⇒ Object
set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/delayed/worker.rb', line 187 def configure_for_job(job) previous_tmpdir = ENV['TMPDIR'] Thread.current[:running_delayed_job] = job dir = Dir.mktmpdir("job-#{job.id}-#{self.name.gsub(/[^\w\.]/, '.')}-") begin ENV['TMPDIR'] = dir yield ensure FileUtils.remove_entry(dir, true) end ensure ENV['TMPDIR'] = previous_tmpdir Thread.current[:running_delayed_job] = nil end |
#exit? ⇒ Boolean
64 65 66 |
# File 'lib/delayed/worker.rb', line 64 def exit? @exit || parent_exited? end |
#handle_failed_job(job, error) ⇒ Object
162 163 164 165 166 |
# File 'lib/delayed/worker.rb', line 162 def handle_failed_job(job, error) job.last_error = "#{error.}\n#{error.backtrace.join("\n")}" say("Failed with #{error.class} [#{error.}] (#{job.attempts} attempts)", :error) job.reschedule(error) end |
#id ⇒ Object
168 169 170 |
# File 'lib/delayed/worker.rb', line 168 def id Process.pid end |
#log_job(job, format = :short) ⇒ Object
176 177 178 179 180 181 182 183 |
# File 'lib/delayed/worker.rb', line 176 def log_job(job, format = :short) case format when :long "#{job.full_name} #{ job.to_json(:include_root => false, :only => %w(tag strand priority attempts created_at max_attempts source)) }" else job.full_name end end |
#name ⇒ Object
56 57 58 |
# File 'lib/delayed/worker.rb', line 56 def name @name ||= "#{Socket.gethostname rescue "X"}:#{self.id}" end |
#name=(name) ⇒ Object
52 53 54 |
# File 'lib/delayed/worker.rb', line 52 def name=(name) @name = name end |
#parent_exited? ⇒ Boolean
68 69 70 |
# File 'lib/delayed/worker.rb', line 68 def parent_exited? @parent_pid && @parent_pid != Process.ppid end |
#perform(job) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/delayed/worker.rb', line 125 def perform(job) count = 1 self.class.lifecycle.run_callbacks(:perform, self, job) do set_process_name("run:#{Settings.worker_procname_prefix}#{job.id}:#{job.name}") say("Processing #{log_job(job, :long)}", :info) runtime = Benchmark.realtime do if job.batch? # each job in the batch will have perform called on it, so we don't # need a timeout around this count = perform_batch(job) else job.invoke_job end job.destroy end say("Completed #{log_job(job)} #{"%.0fms" % (runtime * 1000)}", :info) end count rescue Exception => e handle_failed_job(job, e) count end |
#perform_batch(parent_job) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/delayed/worker.rb', line 148 def perform_batch(parent_job) batch = parent_job.payload_object if batch.mode == :serial batch.jobs.each do |job| job.source = parent_job.source job.create_and_lock!(name) configure_for_job(job) do perform(job) end end batch.items.size end end |
#run ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/delayed/worker.rb', line 90 def run job = self.class.lifecycle.run_callbacks(:pop, self) do Delayed::Job.get_and_lock_next_available( name, queue, min_priority, max_priority) end if job configure_for_job(job) do @job_count += perform(job) if @max_job_count > 0 && @job_count >= @max_job_count say "Max job count of #{@max_job_count} exceeded, dying" @exit = true end if @max_memory_usage > 0 memory = sample_memory if memory > @max_memory_usage say "Memory usage of #{memory} exceeds max of #{@max_memory_usage}, dying" @exit = true else say "Memory usage: #{memory}" end end end else set_process_name("wait:#{Settings.worker_procname_prefix}#{@queue}:#{min_priority || 0}:#{max_priority || 'max'}") sleep(Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)) end end |
#sample_memory ⇒ Object
generic unix solution
217 218 219 220 |
# File 'lib/delayed/worker.rb', line 217 def sample_memory s = File.read("/proc/#{Process.pid}/statm").to_i rescue 0 s * LINUX_PAGE_SIZE / 1024 end |
#say(msg, level = :debug) ⇒ Object
172 173 174 |
# File 'lib/delayed/worker.rb', line 172 def say(msg, level = :debug) Rails.logger.send(level, msg) end |
#set_process_name(new_name) ⇒ Object
60 61 62 |
# File 'lib/delayed/worker.rb', line 60 def set_process_name(new_name) $0 = "delayed:#{new_name}" end |
#start ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/delayed/worker.rb', line 72 def start say "Starting worker", :info trap('INT') { say 'Exiting'; @exit = true } loop do run break if exit? end say "Stopping worker", :info rescue => e Rails.logger.fatal("Child process died: #{e.inspect}") rescue nil self.class.lifecycle.run_callbacks(:exceptional_exit, self, e) { } ensure Delayed::Job.clear_locks!(name) end |