Class: Delayed::Worker

Inherits:
Object show all
Defined in:
lib/delayed/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



74
75
76
77
78
# File 'lib/delayed/worker.rb', line 74

def initialize(options={})
  @quiet = options[:quiet]
  self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
  self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end

Instance Attribute Details

#name_prefixObject

name_prefix is ignored if name is set directly



50
51
52
# File 'lib/delayed/worker.rb', line 50

def name_prefix
  @name_prefix
end

Class Method Details

.backend=(backend) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/delayed/worker.rb', line 54

def self.backend=(backend)
  if backend.is_a? Symbol
    require "delayed/backend/#{backend}"
    backend = "Delayed::Backend::#{backend.to_s.to_const_string}::Job".constantize
  end
  @@backend = backend
  silence_warnings { ::Delayed.const_set(:Job, backend) }
end

.guess_backendObject



63
64
65
66
67
68
69
70
71
72
# File 'lib/delayed/worker.rb', line 63

def self.guess_backend
  self.backend ||= if defined?(ActiveRecord)
    :active_record
  elsif defined?(MongoMapper)
    :mongo_mapper
  else
    logger.warn "Could not decide on a backend, defaulting to active_record"
    :active_record
  end
end

Instance Method Details

#nameObject

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.



84
85
86
87
# File 'lib/delayed/worker.rb', line 84

def name
  return @name unless @name.nil?
  "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end

#name=(val) ⇒ Object

Sets the name of the worker. Setting the name to nil will reset the default worker name



91
92
93
# File 'lib/delayed/worker.rb', line 91

def name=(val)
  @name = val
end

#reschedule(job, time = nil) ⇒ Object

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/delayed/worker.rb', line 165

def reschedule(job, time = nil)
  if (job.attempts += 1) < self.class.max_attempts
    time ||= Job.db_time_now + (job.attempts ** 4) + 5
    job.run_at = time
    job.unlock
    job.save!
  else
    say "* [JOB] PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO

    if job.payload_object.respond_to? :on_permanent_failure
      say "* [JOB] Running on_permanent_failure hook"
      job.payload_object.on_permanent_failure
    end

    self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
  end
end

#run(job) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/delayed/worker.rb', line 150

def run(job)
  runtime =  Benchmark.realtime do
    Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
    job.destroy
  end
  # TODO: warn if runtime > max_run_time ?
  say "* [JOB] #{name} completed after %.4f" % runtime
  return true  # did work
rescue Exception => e
  handle_failed_job(job, e)
  return false  # work failed
end

#say(text, level = Logger::INFO) ⇒ Object



183
184
185
186
# File 'lib/delayed/worker.rb', line 183

def say(text, level = Logger::INFO)
  puts text unless @quiet
  logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
end

#startObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/delayed/worker.rb', line 95

def start
  say "*** Starting job worker #{name}"

  trap('TERM') { say 'Exiting...'; $exit = true }
  trap('INT')  { say 'Exiting...'; $exit = true }
  trap('QUIT')  do
    say "Delayed::Worker stack trace:"
    say caller.join("\t\n")
    $exit = false
  end

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if $exit

    if count.zero?
      sleep(@@sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if $exit
  end

ensure
  Delayed::Job.clear_locks!(name)
end

#work_off(num = 100) ⇒ Object

Do num jobs and return stats on success/failure. Exit early if interrupted.



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/delayed/worker.rb', line 132

def work_off(num = 100)
  success, failure = 0, 0

  num.times do
    case reserve_and_run_one_job
    when true
        success += 1
    when false
        failure += 1
    else
      break  # leave if no work could be done
    end
    break if $exit # leave if we're exiting
  end

  return [success, failure]
end