Class: Delayed::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/delayed/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



50
51
52
53
54
55
56
# File 'lib/delayed/worker.rb', line 50

def initialize(options={})
  @quiet = options[:quiet]
  self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
  self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
  self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
  self.class.queues = options[:queues] if options.has_key?(:queues)
end

Instance Attribute Details

#name_prefixObject

name_prefix is ignored if name is set directly



26
27
28
# File 'lib/delayed/worker.rb', line 26

def name_prefix
  @name_prefix
end

Class Method Details

.backend=(backend) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/delayed/worker.rb', line 30

def self.backend=(backend)
  if backend.is_a? Symbol
    require "delayed/backend/#{backend}"
    backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
  end
  @@backend = backend
  silence_warnings { ::Delayed.const_set(:Job, backend) }
end

.guess_backendObject



39
40
41
42
43
44
45
46
47
48
# File 'lib/delayed/worker.rb', line 39

def self.guess_backend
  self.backend ||= if defined?(ActiveRecord)
    :active_record
  elsif defined?(MongoMapper)
    :mongo_mapper
  else
    logger.warn "Could not decide on a backend, defaulting to active_record"
    :active_record
  end
end

Instance Method Details

#max_attempts(job) ⇒ Object



164
165
166
# File 'lib/delayed/worker.rb', line 164

def max_attempts(job)
  job.max_attempts || self.class.max_attempts
end

#nameObject

Every worker has a unique name which by default is the pid of the process. There are some advantages to overriding this with something which survives worker retarts: Workers can# safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.



62
63
64
65
# File 'lib/delayed/worker.rb', line 62

def name
  return @name unless @name.nil?
  "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end

#name=(val) ⇒ Object

Sets the name of the worker. Setting the name to nil will reset the default worker name



69
70
71
# File 'lib/delayed/worker.rb', line 69

def name=(val)
  @name = val
end

#reschedule(job, time = nil) ⇒ Object

Reschedule the job in the future (when a job fails). Uses an exponential scale depending on the number of failed attempts.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/delayed/worker.rb', line 137

def reschedule(job, time = nil)
  if (job.attempts += 1) < max_attempts(job)
    job.run_at = time || job.reschedule_at
    job.unlock
    job.save!
  else
    say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO

    if job.payload_object.respond_to? :on_permanent_failure
      say "Running on_permanent_failure hook"
      begin
        job.payload_object.on_permanent_failure
      rescue Exception => error
        say "#{job.name} on permanent failure callback failed with #{error.class.name}: #{error.message}", Logger::ERROR
      end
    end

    self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
  end
end

#run(job) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/delayed/worker.rb', line 123

def run(job)
  runtime =  Benchmark.realtime do
    Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
    job.destroy
  end
  say "#{job.name} completed after %.4f" % runtime
  return true  # did work
rescue Exception => e
  handle_failed_job(job, e)
  return false  # work failed
end

#say(text, level = Logger::INFO) ⇒ Object



158
159
160
161
162
# File 'lib/delayed/worker.rb', line 158

def say(text, level = Logger::INFO)
  text = "[Worker(#{name})] #{text}"
  puts text unless @quiet
  logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
end

#startObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/delayed/worker.rb', line 73

def start
  say "Starting job worker"

  trap('TERM') { say 'Exiting...'; $exit = true }
  trap('INT')  { say 'Exiting...'; $exit = true }

  loop do
    result = nil

    realtime = Benchmark.realtime do
      result = work_off
    end

    count = result.sum

    break if $exit

    if count.zero?
      sleep(self.class.sleep_delay)
    else
      say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
    end

    break if $exit
  end

ensure
  Delayed::Job.clear_locks!(name)
end

#work_off(num = 100) ⇒ Object

Do num jobs and return stats on success/failure. Exit early if interrupted.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/delayed/worker.rb', line 105

def work_off(num = 100)
  success, failure = 0, 0

  num.times do
    case reserve_and_run_one_job
    when true
        success += 1
    when false
        failure += 1
    else
      break  # leave if no work could be done
    end
    break if $exit # leave if we're exiting
  end

  return [success, failure]
end