Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/delayed/worker.rb,
lib/delayed/worker/health_check.rb,
lib/delayed/worker/process_helper.rb,
lib/delayed/worker/null_health_check.rb,
lib/delayed/worker/consul_health_check.rb

Defined Under Namespace

Modules: ProcessHelper Classes: ConsulHealthCheck, HealthCheck, NullHealthCheck

Constant Summary collapse

SIGNALS =
%i{INT TERM QUIT}
LINUX_PAGE_SIZE =

linux w/ proc fs

(size = `getconf PAGESIZE`.to_i; size > 0 ? size : 4096)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, #logger, #say

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/delayed/worker.rb', line 45

def initialize(options = {})
  @exit = false
  @parent_pid = options[:parent_pid]
  @queue_name = options[:queue] ||= Settings.queue
  @min_priority = options[:min_priority]
  @max_priority = options[:max_priority]
  @max_job_count = options[:worker_max_job_count].to_i
  @max_memory_usage = options[:worker_max_memory_usage].to_i
  @work_queue = options.delete(:work_queue) || WorkQueue::InProcess.new
  @health_check_type = Settings.worker_health_check_type
  @health_check_config = Settings.worker_health_check_config
  @config = options
  @job_count = 0

  @self_pipe = IO.pipe
  @signal_queue = []

  app = Rails.application
  if app && !app.config.cache_classes
    Delayed::Worker.lifecycle.around(:perform) do |worker, job, &block|
      reload = app.config.reload_classes_only_on_change != true || app.reloaders.map(&:updated?).any?

      if reload
        if defined?(ActiveSupport::Reloader)
          Rails.application.reloader.reload!
        else
          ActionDispatch::Reloader.prepare!
        end
      end

      begin
        block.call(worker, job)
      ensure
        ActionDispatch::Reloader.cleanup! if reload && !defined?(ActiveSupport::Reloader)
      end
    end
  end

  plugins.each { |plugin| plugin.inject! }
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



12
13
14
# File 'lib/delayed/worker.rb', line 12

def config
  @config
end

#max_priorityObject (readonly)

Returns the value of attribute max_priority.



12
13
14
# File 'lib/delayed/worker.rb', line 12

def max_priority
  @max_priority
end

#min_priorityObject (readonly)

Returns the value of attribute min_priority.



12
13
14
# File 'lib/delayed/worker.rb', line 12

def min_priority
  @min_priority
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



12
13
14
# File 'lib/delayed/worker.rb', line 12

def queue_name
  @queue_name
end

#work_queueObject (readonly)

Returns the value of attribute work_queue.



12
13
14
# File 'lib/delayed/worker.rb', line 12

def work_queue
  @work_queue
end

Class Method Details

.current_jobObject



34
35
36
# File 'lib/delayed/worker.rb', line 34

def self.current_job
  Thread.current[:running_delayed_job]
end

.lifecycleObject



30
31
32
# File 'lib/delayed/worker.rb', line 30

def self.lifecycle
  @lifecycle ||= Delayed::Lifecycle.new
end

.on_max_failures=(block) ⇒ Object

Callback to fire when a delayed job fails max_attempts times. If this callback is defined, then the value of destroy_failed_jobs is ignored, and the job is destroyed if this block returns true.

This allows for destroying “uninteresting” failures, while keeping around interesting failures to be investigated later.

The block is called with args(job, last_exception)



22
23
24
# File 'lib/delayed/worker.rb', line 22

def self.on_max_failures=(block)
  @@on_max_failures = block
end

.running_job(job) ⇒ Object



38
39
40
41
42
43
# File 'lib/delayed/worker.rb', line 38

def self.running_job(job)
  Thread.current[:running_delayed_job] = job
  yield
ensure
  Thread.current[:running_delayed_job] = nil
end

Instance Method Details

#configure_for_job(job) ⇒ Object

set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/delayed/worker.rb', line 250

def configure_for_job(job)
  previous_tmpdir = ENV['TMPDIR']

  self.class.running_job(job) do
    dir = Dir.mktmpdir("job-#{job.id}-#{self.name.gsub(/[^\w\.]/, '.')}-")
    begin
      ENV['TMPDIR'] = dir
      yield
    ensure
      FileUtils.remove_entry(dir, true)
    end
  end
ensure
  ENV['TMPDIR'] = previous_tmpdir
end

#exit?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/delayed/worker.rb', line 94

def exit?
  @exit
end

#handle_failed_job(job, error) ⇒ Object



229
230
231
232
233
# File 'lib/delayed/worker.rb', line 229

def handle_failed_job(job, error)
  job.last_error = "#{error.message}\n#{error.backtrace.join("\n")}"
  logger.error("Failed with #{error.class} [#{error.message}] (#{job.attempts} attempts)")
  job.reschedule(error)
end

#health_checkObject



266
267
268
269
270
271
272
# File 'lib/delayed/worker.rb', line 266

def health_check
  @health_check ||= HealthCheck.build(
    type: @health_check_type,
    worker_name: name,
    config: @health_check_config
  )
end

#idObject



235
236
237
# File 'lib/delayed/worker.rb', line 235

def id
  Process.pid
end

#log_job(job, format = :short) ⇒ Object



239
240
241
242
243
244
245
246
# File 'lib/delayed/worker.rb', line 239

def log_job(job, format = :short)
  case format
  when :long
    "#{job.full_name} #{ job.to_json(:include_root => false, :only => %w(tag strand priority attempts created_at max_attempts source)) }"
  else
    job.full_name
  end
end

#nameObject



86
87
88
# File 'lib/delayed/worker.rb', line 86

def name
  @name ||= "#{Socket.gethostname rescue "X"}:#{self.id}"
end

#perform(job) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/delayed/worker.rb', line 184

def perform(job)
  count = 1
  raise Delayed::Backend::JobExpired, "job expired at #{job.expires_at}" if job.expired?
  self.class.lifecycle.run_callbacks(:perform, self, job) do
    set_process_name("run:#{Settings.worker_procname_prefix}#{job.id}:#{job.name}")
    logger.info("Processing #{log_job(job, :long)}")
    runtime = Benchmark.realtime do
      if job.batch?
        # each job in the batch will have perform called on it, so we don't
        # need a timeout around this
        count = perform_batch(job)
      else
        job.invoke_job
      end
      job.destroy
    end
    logger.info("Completed #{log_job(job)} #{"%.0fms" % (runtime * 1000)}")
  end
  count
rescue SystemExit => se
  # There wasn't really a failure here so no callbacks and whatnot needed,
  # still reschedule the job though.
  job.reschedule(se)
  count
rescue Exception => e
  self.class.lifecycle.run_callbacks(:error, self, job, e) do
    handle_failed_job(job, e)
  end
  count
end

#perform_batch(parent_job) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/delayed/worker.rb', line 215

def perform_batch(parent_job)
  batch = parent_job.payload_object
  if batch.mode == :serial
    batch.jobs.each do |job|
      job.source = parent_job.source
      job.create_and_lock!(name)
      configure_for_job(job) do
        perform(job)
      end
    end
    batch.items.size
  end
end

#runObject



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/delayed/worker.rb', line 150

def run
  return if exit?
  self.class.lifecycle.run_callbacks(:loop, self) do
    set_process_name("pop:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}")
    job = self.class.lifecycle.run_callbacks(:pop, self) do
      work_queue.get_and_lock_next_available(name, config)
    end

    if job
      configure_for_job(job) do
        @job_count += perform(job)

        if @max_job_count > 0 && @job_count >= @max_job_count
          logger.debug "Max job count of #{@max_job_count} exceeded, dying"
          @exit = true
        end

        if @max_memory_usage > 0
          memory = sample_memory
          if memory > @max_memory_usage
            logger.debug "Memory usage of #{memory} exceeds max of #{@max_memory_usage}, dying"
            @exit = true
          else
            logger.debug "Memory usage: #{memory}"
          end
        end
      end
    else
      set_process_name("wait:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}")
      sleep(Settings.sleep_delay + (rand * Settings.sleep_delay_stagger)) unless exit?
    end
  end
end

#sample_memoryObject

generic unix solution



284
285
286
287
# File 'lib/delayed/worker.rb', line 284

def sample_memory
  s = File.read("/proc/#{Process.pid}/statm").to_i rescue 0
  s * LINUX_PAGE_SIZE / 1024
end

#set_process_name(new_name) ⇒ Object



90
91
92
# File 'lib/delayed/worker.rb', line 90

def set_process_name(new_name)
  $0 = "delayed:#{new_name}"
end

#startObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/delayed/worker.rb', line 103

def start
  logger.info "Starting worker"
  set_process_name("start:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}")

  work_thread = Thread.current
  SIGNALS.each do |sig|
    trap(sig) { @signal_queue << sig; wake_up }
  end

  health_check.start

  signal_processor = Thread.new do
    loop do
      @self_pipe[0].read(1)
      case @signal_queue.pop
      when :INT, :TERM
        @exit = true # get the main thread to bail early if it's waiting for a job
        work_thread.raise(SystemExit) # Force the main thread to bail out of the current job
        break
      when :QUIT
        @exit = true
      else
        logger.error "Unknown signal '#{sig}' received"
      end
    end
  end

  self.class.lifecycle.run_callbacks(:execute, self) do
    until exit? do
      run
    end
  end

  logger.info "Stopping worker"
rescue => e
  Rails.logger.fatal("Child process died: #{e.inspect}") rescue nil
  self.class.lifecycle.run_callbacks(:exceptional_exit, self, e) { }
ensure
  health_check.stop
  work_queue.close
  if signal_processor
    signal_processor.kill
    signal_processor.join
  end
  Delayed::Job.clear_locks!(name)
end

#wake_upObject



98
99
100
101
# File 'lib/delayed/worker.rb', line 98

def wake_up
  @self_pipe[1].write_nonblock('.', exception: false)
  work_queue.wake_up
end