Class: Beanstalker::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/beanstalker/worker.rb

Constant Summary collapse

SLEEP_TIME =

rails loads this file twice

60

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(top_binding, options = {}) ⇒ Worker

Returns a new instance of Worker.



54
55
56
57
58
59
60
61
62
63
# File 'lib/beanstalker/worker.rb', line 54

def initialize(top_binding, options = {})
  mapper_file = "#{RAILS_ROOT}/config/beanstalker_mapper.rb"
  @mapper = Beanstalker::Mapper.new(mapper_file) if File.exist?(mapper_file)
  @top_binding = top_binding
  @stop = false
  @options = options
  if @options && @options[:servers]
    Beanstalker::Queue.queue = Beanstalk::Pool.new(@options[:servers])
  end
end

Class Attribute Details

.before_filterObject

Returns the value of attribute before_filter.



31
32
33
# File 'lib/beanstalker/worker.rb', line 31

def before_filter
  @before_filter
end

.custom_error_handlerObject

Returns the value of attribute custom_error_handler.



29
30
31
# File 'lib/beanstalker/worker.rb', line 29

def custom_error_handler
  @custom_error_handler
end

.custom_timeout_handlerObject

Returns the value of attribute custom_timeout_handler.



30
31
32
# File 'lib/beanstalker/worker.rb', line 30

def custom_timeout_handler
  @custom_timeout_handler
end

.finishObject

Returns the value of attribute finish.



28
29
30
# File 'lib/beanstalker/worker.rb', line 28

def finish
  @finish
end

Class Method Details

.before_reserve(&block) ⇒ Object



45
46
47
# File 'lib/beanstalker/worker.rb', line 45

def before_reserve(&block)
  before_reserves << block
end

.before_reservesObject



41
42
43
# File 'lib/beanstalker/worker.rb', line 41

def before_reserves
  @before_reserves ||= []
end

.default_handle_error(job, ex) ⇒ Object



237
238
239
240
241
242
243
# File 'lib/beanstalker/worker.rb', line 237

def self.default_handle_error(job, ex)
  Daemonizer.logger.info "Job failed: #{job.server}/#{job.id}"
  Daemonizer.logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
  job.decay
rescue Beanstalk::UnexpectedResponse => e
  Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}"
end

.default_handle_timeout(job) ⇒ Object



245
246
247
248
249
250
# File 'lib/beanstalker/worker.rb', line 245

def self.default_handle_timeout(job)
  Daemonizer.logger.info "Job timeout: #{job.server}/#{job.id}"
  job.decay
rescue Beanstalk::UnexpectedResponse => e
  Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}"
end

.error_handler(&block) ⇒ Object



33
34
35
# File 'lib/beanstalker/worker.rb', line 33

def error_handler(&block)
  self.custom_error_handler = block
end

.run_before_reserveObject



49
50
51
# File 'lib/beanstalker/worker.rb', line 49

def run_before_reserve
  before_reserves.each {|b| b.call}
end

.timeout_handler(&block) ⇒ Object



37
38
39
# File 'lib/beanstalker/worker.rb', line 37

def timeout_handler(&block)
  self.custom_timeout_handler = block
end

Instance Method Details

#brief?(t1, t2) ⇒ Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/beanstalker/worker.rb', line 119

def brief?(t1, t2)
  ((t2 - t1) * 100).to_i.abs < 10
end

#class_error_handler(klass) ⇒ Object



176
177
178
179
180
181
182
183
184
185
# File 'lib/beanstalker/worker.rb', line 176

def class_error_handler(klass)
  if klass.respond_to?(:async_error_handler) and
     async_error_handler = klass.async_error_handler and
     async_error_handler.is_a?(Proc)
  then
    async_error_handler
  else
    false
  end
end

#dispatch(job) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/beanstalker/worker.rb', line 148

def dispatch(job)
  ActiveRecord::Base.verify_active_connections!
  logger.info "Got job: #{get_job_body(job).inspect}"
  if rails_job?(job)
    run_ao_job(job)
  elsif mapped_job?(job)
    run_mapped_job(job)
  else
    logger.error "Job #{job.inspect} cannot be processed... deleteing"
    job.delete
  end
end

#do_all_workObject



320
321
322
323
324
# File 'lib/beanstalker/worker.rb', line 320

def do_all_work
  logger.info 'finishing all running jobs'
  f = self.class.finish
  f.call if f
end

#get_jobObject



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/beanstalker/worker.rb', line 123

def get_job
  loop do
    begin
      Beanstalker::Queue.queue.connect
      self.class.run_before_reserve
      return reserve_and_set_hint
    rescue Interrupt => ex
      raise ex
    rescue SignalException => ex
      raise ex
    rescue Beanstalk::DeadlineSoonError
      # Do nothing; immediately try again, giving the user a chance to
      # clean up in the before_reserve hook.
      Daemonizer.logger.info 'Job deadline soon; you should clean up.'
    rescue Exception => ex
      @q_hint = nil # in case there's something wrong with this conn
      Daemonizer.logger.info(
        "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
      Daemonizer.logger.info 'something is wrong. We failed to get a job.'
      Daemonizer.logger.info "sleeping for #{SLEEP_TIME}s..."
      sleep(SLEEP_TIME)
    end
  end
end

#get_job_body(job) ⇒ Object



286
287
288
# File 'lib/beanstalker/worker.rb', line 286

def get_job_body(job)
  job.ybody.with_indifferent_access
end

#handle_error(job, ex) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/beanstalker/worker.rb', line 187

def handle_error(job, ex)
  custom_error_handler_ok = false
  Daemonizer.logger.warn "Handling exception: #{ex.backtrace.join("\n")}, job = #{job.id}"

  if rails_job?(job)
    class_name = get_job_body(job)[:data][:class]
    if class_name
      begin
        klass = class_name.constantize
      rescue Exception => e
        klass = nil
      end
      error_handler = class_error_handler(klass)
      if error_handler.is_a?(Proc)
        Daemonizer.logger.info "Running custom error handler for class #{class_name}, job = #{job.id}"
        error_handler.call(job, ex)
        job_reserved = begin
          job.stats['state'] == 'reserved'
        rescue Beanstalk::NotFoundError
          false
        end
        if job_reserved
          Daemonizer.logger.info "Custom error handler for class #{class_name} didn't release job. job = #{job.id}"
        else
          Daemonizer.logger.info "Custom error handler for class #{class_name} released job. job = #{job.id}"
          custom_error_handler_ok = true
        end
      end
    end
  end

  unless custom_error_handler_ok
    Daemonizer.logger.info "Running common handler. job = #{job.id}"
    if self.class.custom_error_handler
      self.class.custom_error_handler.call(job, ex)
    else
      self.class.default_handle_error(job, ex)
    end
  else
  end
end

#handle_timeout(job) ⇒ Object



229
230
231
232
233
234
235
# File 'lib/beanstalker/worker.rb', line 229

def handle_timeout(job)
  if self.class.custom_timeout_handler
    self.class.custom_timeout_handler.call(job)
  else
    self.class.default_handle_timeout(job)
  end
end

#loggerObject



103
104
105
# File 'lib/beanstalker/worker.rb', line 103

def logger
 defined?(Daemonizer) && Daemonizer.logger or RAILS_DEFAULT_LOGGER
end

#main_loopObject



65
66
67
68
69
70
71
# File 'lib/beanstalker/worker.rb', line 65

def main_loop
  trap('TERM') { @stop = true }
  loop do
    break if @stop
    safe_dispatch(get_job)
  end
end

#mapped_job?(job) ⇒ Boolean

Returns:

  • (Boolean)


316
317
318
# File 'lib/beanstalker/worker.rb', line 316

def mapped_job?(job)
  @mapper && @mapper.can_handle_kind?(get_job_body(job)['kind'])
end

#q_hintObject



99
100
101
# File 'lib/beanstalker/worker.rb', line 99

def q_hint
  @q_hint || Beanstalker::Queue.queue
end

#rails_job?(job) ⇒ Boolean

Returns:

  • (Boolean)


312
313
314
# File 'lib/beanstalker/worker.rb', line 312

def rails_job?(job)
  get_job_body(job)['kind'].to_s == 'rails_beanstalker'
end

#reserve_and_set_hintObject

This heuristic is to help prevent one queue from starving. The idea is that if the connection returns a job right away, it probably has more available. But if it takes time, then it’s probably empty. So reuse the same connection as long as it stays fast. Otherwise, have no preference.



111
112
113
114
115
116
117
# File 'lib/beanstalker/worker.rb', line 111

def reserve_and_set_hint
  t1 = Time.now.utc
  return job = q_hint.reserve
ensure
  t2 = Time.now.utc
  @q_hint = if brief?(t1, t2) and job then job.conn else nil end
end

#runObject



92
93
94
95
96
97
# File 'lib/beanstalker/worker.rb', line 92

def run
  startup
  main_loop
rescue Interrupt
  shutdown
end

#run_ao_job(job) ⇒ Object



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/beanstalker/worker.rb', line 290

def run_ao_job(job)
  job_data = get_job_body(job)['data']
  code = job_data['code']
  run_with_ruby_timeout_if_set(code, job) do
    t1 = Time.now
    f = self.class.before_filter
    statistics = job.stats.dup
    can_run = f ? f.call(job) : true
    if can_run
      run_code(job.id, code)
      job.delete
      logger.info "Finished. Job id=#{statistics['id']}. Code '#{code}'. Time taken: #{(Time.now - t1).to_f} sec"
    else
      logger.info "Not runnind due to :before_filter restriction. Job id=#{statistics['id']}. Code '#{code}'."
    end
  end
end

#run_code(job_id, code) ⇒ Object



308
309
310
# File 'lib/beanstalker/worker.rb', line 308

def run_code(job_id, code)
  eval(code, @top_binding, "(beanstalk job #{job_id})", 1)
end

#run_mapped_job(job) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/beanstalker/worker.rb', line 265

def run_mapped_job(job)
  job_body = get_job_body(job)
  job_kind = job_body['kind']
  job_data = job_body['data']
  job_method = job_data['method']

  job_desc = "#{job_kind}/#{job_method}"

  run_with_ruby_timeout_if_set(job_desc, job) do
    t1 = Time.now
    @map_job = @mapper && @mapper.method_for(job_kind, job_method)
    if @map_job
      @map_job.call(job_data['body'] || {})
      logger.info "Finished. Job id=#{job.stats['id']}. Mapped from '#{job_desc}'. Time taken: #{(Time.now - t1).to_f} sec"
    else
      logger.error "Job id=#{job.stats['id']}. Mapping not found: '#{job_desc}'. Releases #{job.stats['releases']}. Deleting"
    end
    job.delete
  end
end

#run_with_ruby_timeout_if_set(job_desc, job, &block) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/beanstalker/worker.rb', line 252

def run_with_ruby_timeout_if_set(job_desc, job, &block)
  if @options[:ruby_timeout]
    timeout = (job.stats['ttr'].to_f * 0.8)
    logger.info "TO=#{timeout} sec. Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}"
    Timeout.timeout(timeout) do
      block.call
    end
  else
    logger.info "Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}"
    block.call
  end
end

#safe_dispatch(job) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/beanstalker/worker.rb', line 161

def safe_dispatch(job)
  begin
    return dispatch(job)
  rescue Timeout::Error
    handle_timeout(job)
  rescue Interrupt => ex
    begin job.release rescue :ok end
    raise ex
  rescue Exception => ex
    handle_error(job, ex)
  ensure
    Daemonizer.flush_logger
  end
end

#shutdownObject



88
89
90
# File 'lib/beanstalker/worker.rb', line 88

def shutdown
  do_all_work
end

#startupObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/beanstalker/worker.rb', line 73

def startup
  tubes = Array.wrap(@options[:tube] || "default").map(&:to_s) #["default"]
  watched_tubes = Beanstalker::Queue.queue.list_tubes_watched.values.flatten #["default"]
  to_watch = tubes - watched_tubes
  to_ignore = watched_tubes - tubes
  to_watch.each do |t|
    Beanstalker::Queue.queue.watch(t)
  end
  to_ignore.each do |t|
    Beanstalker::Queue.queue.ignore(t)
  end
  Daemonizer.logger.info "Using tubes: #{Beanstalker::Queue.queue.list_tubes_watched.values.flatten.join(',')}"
  Daemonizer.flush_logger
end