Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/delayed/worker.rb,
lib/delayed/worker/health_check.rb,
lib/delayed/worker/process_helper.rb,
lib/delayed/worker/null_health_check.rb,
lib/delayed/worker/consul_health_check.rb

Defined Under Namespace

Modules: ProcessHelper Classes: ConsulHealthCheck, HealthCheck, NullHealthCheck

Constant Summary collapse

SIGNALS =
%i[INT TERM QUIT].freeze
LINUX_PAGE_SIZE =

linux w/ proc fs

(size = `getconf PAGESIZE`.to_i
size.positive? ? size : 4096)

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, #logger, #say

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/delayed/worker.rb', line 59

def initialize(options = {})
  @exit = false
  @parent_pid = options[:parent_pid]
  @queue_name = options[:queue] ||= Settings.queue
  @min_priority = options[:min_priority]
  @max_priority = options[:max_priority]
  @max_job_count = options[:worker_max_job_count].to_i
  @max_memory_usage = options[:worker_max_memory_usage].to_i
  @memory_high_water = options[:worker_memory_high_water]&.to_i || (@max_memory_usage / 2)
  @work_queue = options.delete(:work_queue) || WorkQueue::InProcess.new
  @health_check_type = Settings.worker_health_check_type
  @health_check_config = Settings.worker_health_check_config
  @config = options
  @job_count = 0

  @signal_queue = []

  plugins << Delayed::RailsReloaderPlugin
  plugins.each(&:inject!)
end

Class Attribute Details

.on_max_failuresObject

Callback to fire when a delayed job fails max_attempts times. If this callback is defined, then the value of destroy_failed_jobs is ignored, and the job is destroyed if this block returns true.

This allows for destroying “uninteresting” failures, while keeping around interesting failures to be investigated later.

The block is called with args(job, last_exception)



38
39
40
# File 'lib/delayed/worker.rb', line 38

def on_max_failures
  @on_max_failures
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



27
28
29
# File 'lib/delayed/worker.rb', line 27

def config
  @config
end

#max_priorityObject (readonly)

Returns the value of attribute max_priority.



27
28
29
# File 'lib/delayed/worker.rb', line 27

def max_priority
  @max_priority
end

#min_priorityObject (readonly)

Returns the value of attribute min_priority.



27
28
29
# File 'lib/delayed/worker.rb', line 27

def min_priority
  @min_priority
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



27
28
29
# File 'lib/delayed/worker.rb', line 27

def queue_name
  @queue_name
end

#work_queueObject (readonly)

Returns the value of attribute work_queue.



27
28
29
# File 'lib/delayed/worker.rb', line 27

def work_queue
  @work_queue
end

Class Method Details

.current_jobObject



48
49
50
# File 'lib/delayed/worker.rb', line 48

def self.current_job
  Thread.current[:running_delayed_job]
end

.lifecycleObject



44
45
46
# File 'lib/delayed/worker.rb', line 44

def self.lifecycle
  @lifecycle ||= Delayed::Lifecycle.new
end

.running_job(job) ⇒ Object



52
53
54
55
56
57
# File 'lib/delayed/worker.rb', line 52

def self.running_job(job)
  Thread.current[:running_delayed_job] = job
  yield
ensure
  Thread.current[:running_delayed_job] = nil
end

Instance Method Details

#cleaned?Boolean

Returns:

  • (Boolean)


170
171
172
# File 'lib/delayed/worker.rb', line 170

def cleaned?
  @cleaned
end

#cleanup!Object



160
161
162
163
164
165
166
167
168
# File 'lib/delayed/worker.rb', line 160

def cleanup!
  return if cleaned?

  health_check.stop
  work_queue.close
  Delayed::Job.clear_locks!(name)

  @cleaned = true
end

#configure_for_job(job) ⇒ Object

set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/delayed/worker.rb', line 289

def configure_for_job(job)
  previous_tmpdir = ENV.fetch("TMPDIR", nil)

  self.class.running_job(job) do
    dir = Dir.mktmpdir("job-#{job.id}-#{name.gsub(/[^\w.]/, ".")}-")
    begin
      ENV["TMPDIR"] = dir
      yield
    ensure
      FileUtils.remove_entry(dir, true)
    end
  end
ensure
  ENV["TMPDIR"] = previous_tmpdir
end

#exit?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/delayed/worker.rb', line 88

def exit?
  !!@exit || parent_exited?
end

#handle_failed_job(job, error) ⇒ Object



268
269
270
271
272
# File 'lib/delayed/worker.rb', line 268

def handle_failed_job(job, error)
  job.last_error = "#{error.message}\n#{error.backtrace.join("\n")}"
  logger.error("Failed with #{error.class} [#{error.message}] (#{job.attempts} attempts)")
  job.reschedule(error)
end

#health_checkObject



305
306
307
308
309
310
311
# File 'lib/delayed/worker.rb', line 305

def health_check
  @health_check ||= HealthCheck.build(
    type: @health_check_type,
    worker_name: name,
    config: @health_check_config
  )
end

#idObject



274
275
276
# File 'lib/delayed/worker.rb', line 274

def id
  Process.pid
end

#log_job(job, format = :short) ⇒ Object



278
279
280
281
282
283
284
285
# File 'lib/delayed/worker.rb', line 278

def log_job(job, format = :short)
  case format
  when :long
    "#{job.full_name} #{Settings.job_detailed_log_format.call(job)}"
  else
    "#{job.full_name} #{Settings.job_short_log_format.call(job)}".strip
  end
end

#nameObject



80
81
82
# File 'lib/delayed/worker.rb', line 80

def name
  @name ||= "#{Socket.gethostname rescue "X"}:#{id}"
end

#parent_exited?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/delayed/worker.rb', line 92

def parent_exited?
  @parent_pid && @parent_pid != Process.ppid
end

#perform(job) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/delayed/worker.rb', line 211

def perform(job)
  begin
    count = 1
    raise Delayed::Backend::JobExpired, "job expired at #{job.expires_at}" if job.expired?

    self.class.lifecycle.run_callbacks(:perform, self, job) do
      self.process_name = "run:#{Settings.worker_procname_prefix}#{job.id}:#{job.name}"
      logger.info("Processing #{log_job(job, :long)}")
      runtime = Benchmark.realtime do
        if job.batch?
          # each job in the batch will have perform called on it, so we don't
          # need a timeout around this
          count = perform_batch(job)
        else
          job.invoke_job
        end
        job.destroy
      end
      logger.info("Completed #{log_job(job, :short)} #{format("%.0fms", (runtime * 1000))}")
    end
  rescue ::Delayed::RetriableError => e
    can_retry = job.attempts + 1 < job.inferred_max_attempts
    callback_type = can_retry ? :retry : :error
    self.class.lifecycle.run_callbacks(callback_type, self, job, e) do
      handle_failed_job(job, e)
    end
  rescue SystemExit => e
    # There wasn't really a failure here so no callbacks and whatnot needed,
    # still reschedule the job though.
    job.reschedule(e)
  rescue Exception => e # rubocop:disable Lint/RescueException
    if e.is_a?(NoMemoryError)
      GC.start # try and free up some memory before reporting the error
      logger.debug "Could not allocate memory (max is #{@max_memory_usage}), dying"
      @exit = true
    end
    self.class.lifecycle.run_callbacks(:error, self, job, e) do
      handle_failed_job(job, e)
    end
  end
  count
end

#perform_batch(parent_job) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/delayed/worker.rb', line 254

def perform_batch(parent_job)
  batch = parent_job.payload_object
  return unless batch.mode == :serial

  batch.jobs.each do |job|
    job.source = parent_job.source
    job.create_and_lock!(name)
    configure_for_job(job) do
      perform(job)
    end
  end
  batch.items.size
end

#process_name=(new_name) ⇒ Object



84
85
86
# File 'lib/delayed/worker.rb', line 84

def process_name=(new_name)
  $0 = "delayed:#{new_name}"
end

#runObject



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/delayed/worker.rb', line 174

def run
  return if exit?

  self.class.lifecycle.run_callbacks(:loop, self) do
    self.process_name =
      "pop:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || "max"}"
    job = self.class.lifecycle.run_callbacks(:pop, self) do
      work_queue.get_and_lock_next_available(name, config)
    end

    if job
      configure_for_job(job) do
        @job_count += perform(job)

        if @max_job_count.positive? && @job_count >= @max_job_count
          logger.debug "Max job count of #{@max_job_count} exceeded, dying"
          @exit = true
        end

        if @memory_high_water.positive?
          memory = sample_memory
          if memory > @memory_high_water
            logger.debug "Memory usage of #{memory} exceeds high water of #{@memory_high_water}, dying"
            @exit = true
          else
            logger.debug "Memory usage: #{memory}"
          end
        end
      end
    else
      self.process_name =
        "wait:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || "max"}"
      sleep(Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)) unless exit?
    end
  end
end

#sample_memoryObject

generic unix solution



324
325
326
327
# File 'lib/delayed/worker.rb', line 324

def sample_memory
  s = File.read("/proc/#{Process.pid}/statm").to_i rescue 0
  s * LINUX_PAGE_SIZE / 1024
end

#startObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/delayed/worker.rb', line 101

def start
  logger.info "Starting worker"
  begin
    Process.setrlimit(:DATA, @max_memory_usage, @max_memory_usage * 2) if @max_memory_usage.positive?
  rescue Errno::EINVAL
    # couldn't set an OS-level limit
  end
  self.process_name =
    "start:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || "max"}"
  @self_pipe = IO.pipe
  work_queue.init

  work_thread = Thread.current
  SIGNALS.each do |sig|
    trap(sig) do
      @signal_queue << sig
      wake_up
    end
  end

  raise "Could not register health_check" unless health_check.start

  signal_processor = Thread.new do
    loop do
      @self_pipe[0].read(1)
      case @signal_queue.pop
      when :INT, :TERM
        @exit = true # get the main thread to bail early if it's waiting for a job
        work_thread.raise(SystemExit) # Force the main thread to bail out of the current job
        cleanup! # we're going to get SIGKILL'd in a moment, so clean up asap
        break
      when :QUIT
        @exit = true
      else
        logger.error "Unknown signal '#{sig}' received"
      end
    end
  end

  self.class.lifecycle.run_callbacks(:execute, self) do
    run until exit?
  end

  logger.info "Stopping worker"
rescue => e
  Rails.logger.fatal("Child process died: #{e.inspect}") rescue nil
  self.class.lifecycle.run_callbacks(:exceptional_exit, self, e) { nil }
ensure
  cleanup!

  if signal_processor
    signal_processor.kill
    signal_processor.join
  end

  @self_pipe&.each(&:close)
  @self_pipe = nil
end

#wake_upObject



96
97
98
99
# File 'lib/delayed/worker.rb', line 96

def wake_up
  @self_pipe[1].write_nonblock(".", exception: false)
  work_queue.wake_up
end