Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker.rb

Constant Summary collapse

LINUX_PAGE_SIZE =

linux w/ proc fs

(size = `getconf PAGESIZE`.to_i; size > 0 ? size : 4096)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/delayed/worker.rb', line 42

def initialize(options = {})
  @exit = false
  @parent_pid = options[:parent_pid]
  @queue_name = options[:queue] ||= Settings.queue
  @min_priority = options[:min_priority]
  @max_priority = options[:max_priority]
  @max_job_count = options[:worker_max_job_count].to_i
  @max_memory_usage = options[:worker_max_memory_usage].to_i
  @work_queue = options.delete(:work_queue) || WorkQueue::InProcess.new
  @config = options
  @job_count = 0

  app = Rails.application
  if app && !app.config.cache_classes
    Delayed::Worker.lifecycle.around(:perform) do |&block|
      reload = app.config.reload_classes_only_on_change != true || app.reloaders.map(&:updated?).any?
      ActionDispatch::Reloader.prepare! if reload
      begin
        block.call
      ensure
        ActionDispatch::Reloader.cleanup! if reload
      end
    end
  end

  plugins.each { |plugin| plugin.inject! }
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



9
10
11
# File 'lib/delayed/worker.rb', line 9

def config
  @config
end

#max_priorityObject (readonly)

Returns the value of attribute max_priority.



9
10
11
# File 'lib/delayed/worker.rb', line 9

def max_priority
  @max_priority
end

#min_priorityObject (readonly)

Returns the value of attribute min_priority.



9
10
11
# File 'lib/delayed/worker.rb', line 9

def min_priority
  @min_priority
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



9
10
11
# File 'lib/delayed/worker.rb', line 9

def queue_name
  @queue_name
end

#work_queueObject (readonly)

Returns the value of attribute work_queue.



9
10
11
# File 'lib/delayed/worker.rb', line 9

def work_queue
  @work_queue
end

Class Method Details

.current_jobObject



31
32
33
# File 'lib/delayed/worker.rb', line 31

def self.current_job
  Thread.current[:running_delayed_job]
end

.lifecycleObject



27
28
29
# File 'lib/delayed/worker.rb', line 27

def self.lifecycle
  @lifecycle ||= Delayed::Lifecycle.new
end

.on_max_failures=(block) ⇒ Object

Callback to fire when a delayed job fails max_attempts times. If this callback is defined, then the value of destroy_failed_jobs is ignored, and the job is destroyed if this block returns true.

This allows for destroying “uninteresting” failures, while keeping around interesting failures to be investigated later.

The block is called with args(job, last_exception)



19
20
21
# File 'lib/delayed/worker.rb', line 19

def self.on_max_failures=(block)
  @@on_max_failures = block
end

.running_job(job) ⇒ Object



35
36
37
38
39
40
# File 'lib/delayed/worker.rb', line 35

def self.running_job(job)
  Thread.current[:running_delayed_job] = job
  yield
ensure
  Thread.current[:running_delayed_job] = nil
end

Instance Method Details

#configure_for_job(job) ⇒ Object

set up the session context information, so that it gets logged with the job log lines also set up a unique tmpdir, which will get removed at the end of the job.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/delayed/worker.rb', line 205

def configure_for_job(job)
  previous_tmpdir = ENV['TMPDIR']

  self.class.running_job(job) do
    dir = Dir.mktmpdir("job-#{job.id}-#{self.name.gsub(/[^\w\.]/, '.')}-")
    begin
      ENV['TMPDIR'] = dir
      yield
    ensure
      FileUtils.remove_entry(dir, true)
    end
  end
ensure
  ENV['TMPDIR'] = previous_tmpdir
end

#exit?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/delayed/worker.rb', line 78

def exit?
  @exit || parent_exited?
end

#handle_failed_job(job, error) ⇒ Object



180
181
182
183
184
# File 'lib/delayed/worker.rb', line 180

def handle_failed_job(job, error)
  job.last_error = "#{error.message}\n#{error.backtrace.join("\n")}"
  say("Failed with #{error.class} [#{error.message}] (#{job.attempts} attempts)", :error)
  job.reschedule(error)
end

#idObject



186
187
188
# File 'lib/delayed/worker.rb', line 186

def id
  Process.pid
end

#log_job(job, format = :short) ⇒ Object



194
195
196
197
198
199
200
201
# File 'lib/delayed/worker.rb', line 194

def log_job(job, format = :short)
  case format
  when :long
    "#{job.full_name} #{ job.to_json(:include_root => false, :only => %w(tag strand priority attempts created_at max_attempts source)) }"
  else
    job.full_name
  end
end

#nameObject



70
71
72
# File 'lib/delayed/worker.rb', line 70

def name
  @name ||= "#{Socket.gethostname rescue "X"}:#{self.id}"
end

#parent_exited?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/delayed/worker.rb', line 82

def parent_exited?
  @parent_pid && @parent_pid != Process.ppid
end

#perform(job) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/delayed/worker.rb', line 140

def perform(job)
  count = 1
  raise Delayed::Backend::JobExpired, "job expired at #{job.expires_at}" if job.expired?
  self.class.lifecycle.run_callbacks(:perform, self, job) do
    set_process_name("run:#{Settings.worker_procname_prefix}#{job.id}:#{job.name}")
    say("Processing #{log_job(job, :long)}", :info)
    runtime = Benchmark.realtime do
      if job.batch?
        # each job in the batch will have perform called on it, so we don't
        # need a timeout around this
        count = perform_batch(job)
      else
        job.invoke_job
      end
      job.destroy
    end
    say("Completed #{log_job(job)} #{"%.0fms" % (runtime * 1000)}", :info)
  end
  count
rescue Exception => e
  self.class.lifecycle.run_callbacks(:error, self, job, e) do
    handle_failed_job(job, e)
  end
  count
end

#perform_batch(parent_job) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/delayed/worker.rb', line 166

def perform_batch(parent_job)
  batch = parent_job.payload_object
  if batch.mode == :serial
    batch.jobs.each do |job|
      job.source = parent_job.source
      job.create_and_lock!(name)
      configure_for_job(job) do
        perform(job)
      end
    end
    batch.items.size
  end
end

#runObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/delayed/worker.rb', line 107

def run
  self.class.lifecycle.run_callbacks(:loop, self) do
    set_process_name("pop:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}")
    job = self.class.lifecycle.run_callbacks(:pop, self) do
      work_queue.get_and_lock_next_available(name, config)
    end

    if job
      configure_for_job(job) do
        @job_count += perform(job)

        if @max_job_count > 0 && @job_count >= @max_job_count
          say "Max job count of #{@max_job_count} exceeded, dying"
          @exit = true
        end

        if @max_memory_usage > 0
          memory = sample_memory
          if memory > @max_memory_usage
            say "Memory usage of #{memory} exceeds max of #{@max_memory_usage}, dying"
            @exit = true
          else
            say "Memory usage: #{memory}"
          end
        end
      end
    else
      set_process_name("wait:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}")
      sleep(Settings.sleep_delay + (rand * Settings.sleep_delay_stagger))
    end
  end
end

#sample_memoryObject

generic unix solution



231
232
233
234
# File 'lib/delayed/worker.rb', line 231

def sample_memory
  s = File.read("/proc/#{Process.pid}/statm").to_i rescue 0
  s * LINUX_PAGE_SIZE / 1024
end

#say(msg, level = :debug) ⇒ Object



190
191
192
# File 'lib/delayed/worker.rb', line 190

def say(msg, level = :debug)
  Rails.logger.send(level, msg)
end

#set_process_name(new_name) ⇒ Object



74
75
76
# File 'lib/delayed/worker.rb', line 74

def set_process_name(new_name)
  $0 = "delayed:#{new_name}"
end

#startObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/delayed/worker.rb', line 86

def start
  say "Starting worker", :info
  set_process_name("start:#{Settings.worker_procname_prefix}#{@queue_name}:#{min_priority || 0}:#{max_priority || 'max'}")

  trap('INT') { say 'Exiting'; @exit = true }

  self.class.lifecycle.run_callbacks(:execute, self) do
    loop do
      run
      break if exit?
    end
  end

  say "Stopping worker", :info
rescue => e
  Rails.logger.fatal("Child process died: #{e.inspect}") rescue nil
  self.class.lifecycle.run_callbacks(:exceptional_exit, self, e) { }
ensure
  Delayed::Job.clear_locks!(name)
end