Class: Beanstalker::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/beanstalker/worker.rb

Constant Summary collapse

SLEEP_TIME =

rails loads this file twice

60

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(top_binding, options = {}) ⇒ Worker

Returns a new instance of Worker.



55
56
57
58
59
60
61
62
63
64
# File 'lib/beanstalker/worker.rb', line 55

def initialize(top_binding, options = {})
  mapper_file = "#{RAILS_ROOT}/config/beanstalker_mapper.rb"
  @mapper = Beanstalker::Mapper.new(mapper_file) if File.exist?(mapper_file)
  @top_binding = top_binding
  @stop = false
  @options = options
  if @options && @options[:servers]
    Beanstalker::Queue.queue = Beanstalk::Pool.new(@options[:servers])
  end
end

Class Attribute Details

.before_filterObject

Returns the value of attribute before_filter.



31
32
33
# File 'lib/beanstalker/worker.rb', line 31

def before_filter
  @before_filter
end

.custom_error_handlerObject

Returns the value of attribute custom_error_handler.



29
30
31
# File 'lib/beanstalker/worker.rb', line 29

def custom_error_handler
  @custom_error_handler
end

.custom_timeout_handlerObject

Returns the value of attribute custom_timeout_handler.



30
31
32
# File 'lib/beanstalker/worker.rb', line 30

def custom_timeout_handler
  @custom_timeout_handler
end

.finishObject

Returns the value of attribute finish.



28
29
30
# File 'lib/beanstalker/worker.rb', line 28

def finish
  @finish
end

.on_job_eventObject

Returns the value of attribute on_job_event.



32
33
34
# File 'lib/beanstalker/worker.rb', line 32

def on_job_event
  @on_job_event
end

Class Method Details

.before_reserve(&block) ⇒ Object



46
47
48
# File 'lib/beanstalker/worker.rb', line 46

def before_reserve(&block)
  before_reserves << block
end

.before_reservesObject



42
43
44
# File 'lib/beanstalker/worker.rb', line 42

def before_reserves
  @before_reserves ||= []
end

.default_handle_error(job, ex) ⇒ Object



246
247
248
249
250
251
252
# File 'lib/beanstalker/worker.rb', line 246

def self.default_handle_error(job, ex)
  Daemonizer.logger.info "Job failed: #{job.server}/#{job.id}"
  Daemonizer.logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
  job.decay
rescue Beanstalk::UnexpectedResponse => e
  Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}"
end

.default_handle_timeout(job) ⇒ Object



254
255
256
257
258
259
# File 'lib/beanstalker/worker.rb', line 254

def self.default_handle_timeout(job)
  Daemonizer.logger.info "Job timeout: #{job.server}/#{job.id}"
  job.decay
rescue Beanstalk::UnexpectedResponse => e
  Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}"
end

.error_handler(&block) ⇒ Object



34
35
36
# File 'lib/beanstalker/worker.rb', line 34

def error_handler(&block)
  self.custom_error_handler = block
end

.run_before_reserveObject



50
51
52
# File 'lib/beanstalker/worker.rb', line 50

def run_before_reserve
  before_reserves.each {|b| b.call}
end

.timeout_handler(&block) ⇒ Object



38
39
40
# File 'lib/beanstalker/worker.rb', line 38

def timeout_handler(&block)
  self.custom_timeout_handler = block
end

Instance Method Details

#brief?(t1, t2) ⇒ Boolean

Returns:

  • (Boolean)


124
125
126
# File 'lib/beanstalker/worker.rb', line 124

def brief?(t1, t2)
  ((t2 - t1) * 100).to_i.abs < 10
end

#class_error_handler(klass) ⇒ Object



184
185
186
187
188
189
190
191
192
193
# File 'lib/beanstalker/worker.rb', line 184

def class_error_handler(klass)
  if klass.respond_to?(:async_error_handler) and
     async_error_handler = klass.async_error_handler and
     async_error_handler.is_a?(Proc)
  then
    async_error_handler
  else
    false
  end
end

#dispatch(job) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/beanstalker/worker.rb', line 153

def dispatch(job)
  ActiveRecord::Base.verify_active_connections!

  logger.info "Got job: #{get_job_body(job).inspect}"
  self.class.on_job_event.call(job, :dispatch) if self.class.on_job_event

  if rails_job?(job)
    run_ao_job(job)
  elsif mapped_job?(job)
    run_mapped_job(job)
  else
    logger.error "Job #{job.inspect} cannot be processed... deleteing"
    job.delete
  end
end

#do_all_workObject



329
330
331
332
333
# File 'lib/beanstalker/worker.rb', line 329

def do_all_work
  logger.info 'finishing all running jobs'
  f = self.class.finish
  f.call if f
end

#get_jobObject



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/beanstalker/worker.rb', line 128

def get_job
  loop do
    begin
      Beanstalker::Queue.queue.connect
      self.class.run_before_reserve
      return reserve_and_set_hint
    rescue Interrupt => ex
      raise ex
    rescue SignalException => ex
      raise ex
    rescue Beanstalk::DeadlineSoonError
      # Do nothing; immediately try again, giving the user a chance to
      # clean up in the before_reserve hook.
      Daemonizer.logger.info 'Job deadline soon; you should clean up.'
    rescue Exception => ex
      @q_hint = nil # in case there's something wrong with this conn
      Daemonizer.logger.info(
        "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
      Daemonizer.logger.info 'something is wrong. We failed to get a job.'
      Daemonizer.logger.info "sleeping for #{SLEEP_TIME}s..."
      sleep(SLEEP_TIME)
    end
  end
end

#get_job_body(job) ⇒ Object



295
296
297
# File 'lib/beanstalker/worker.rb', line 295

def get_job_body(job)
  job.ybody.with_indifferent_access
end

#handle_error(job, ex) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/beanstalker/worker.rb', line 195

def handle_error(job, ex)
  custom_error_handler_ok = false
  Daemonizer.logger.warn "Handling exception: #{ex.backtrace.join("\n")}, job = #{job.id}"
  self.class.on_job_event.call(job, :error) if self.class.on_job_event

  if rails_job?(job)
    class_name = get_job_body(job)[:data][:class]
    if class_name
      begin
        klass = class_name.constantize
      rescue Exception => e
        klass = nil
      end
      error_handler = class_error_handler(klass)
      if error_handler.is_a?(Proc)
        Daemonizer.logger.info "Running custom error handler for class #{class_name}, job = #{job.id}"
        error_handler.call(job, ex)
        job_reserved = begin
          job.stats['state'] == 'reserved'
        rescue Beanstalk::NotFoundError
          false
        end
        if job_reserved
          Daemonizer.logger.info "Custom error handler for class #{class_name} didn't release job. job = #{job.id}"
        else
          Daemonizer.logger.info "Custom error handler for class #{class_name} released job. job = #{job.id}"
          custom_error_handler_ok = true
        end
      end
    end
  end

  unless custom_error_handler_ok
    Daemonizer.logger.info "Running common handler. job = #{job.id}"
    if self.class.custom_error_handler
      self.class.custom_error_handler.call(job, ex)
    else
      self.class.default_handle_error(job, ex)
    end
  else
  end
end

#handle_timeout(job) ⇒ Object



238
239
240
241
242
243
244
# File 'lib/beanstalker/worker.rb', line 238

def handle_timeout(job)
  if self.class.custom_timeout_handler
    self.class.custom_timeout_handler.call(job)
  else
    self.class.default_handle_timeout(job)
  end
end

#loggerObject



108
109
110
# File 'lib/beanstalker/worker.rb', line 108

def logger
 defined?(Daemonizer) && Daemonizer.logger or RAILS_DEFAULT_LOGGER
end

#main_loopObject



66
67
68
69
70
71
72
# File 'lib/beanstalker/worker.rb', line 66

def main_loop
  trap('TERM') { @stop = true }
  loop do
    break if @stop
    safe_dispatch(get_job)
  end
end

#mapped_job?(job) ⇒ Boolean

Returns:

  • (Boolean)


325
326
327
# File 'lib/beanstalker/worker.rb', line 325

def mapped_job?(job)
  @mapper && @mapper.can_handle_kind?(get_job_body(job)['kind'])
end

#q_hintObject



104
105
106
# File 'lib/beanstalker/worker.rb', line 104

def q_hint
  @q_hint || Beanstalker::Queue.queue
end

#rails_job?(job) ⇒ Boolean

Returns:

  • (Boolean)


321
322
323
# File 'lib/beanstalker/worker.rb', line 321

def rails_job?(job)
  get_job_body(job)['kind'].to_s == 'rails_beanstalker'
end

#reserve_and_set_hintObject

This heuristic is to help prevent one queue from starving. The idea is that if the connection returns a job right away, it probably has more available. But if it takes time, then it’s probably empty. So reuse the same connection as long as it stays fast. Otherwise, have no preference.



116
117
118
119
120
121
122
# File 'lib/beanstalker/worker.rb', line 116

def reserve_and_set_hint
  t1 = Time.now.utc
  return job = q_hint.reserve
ensure
  t2 = Time.now.utc
  @q_hint = if brief?(t1, t2) and job then job.conn else nil end
end

#runObject



97
98
99
100
101
102
# File 'lib/beanstalker/worker.rb', line 97

def run
  startup
  main_loop
rescue Interrupt
  shutdown
end

#run_ao_job(job) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/beanstalker/worker.rb', line 299

def run_ao_job(job)
  job_data = get_job_body(job)['data']
  code = job_data['code']
  run_with_ruby_timeout_if_set(code, job) do
    t1 = Time.now
    f = self.class.before_filter
    statistics = job.stats.dup
    can_run = f ? f.call(job) : true
    if can_run
      run_code(job.id, code)
      job.delete
      logger.info "Finished. Job id=#{statistics['id']}. Code '#{code}'. Time taken: #{(Time.now - t1).to_f} sec"
    else
      logger.info "Not runnind due to :before_filter restriction. Job id=#{statistics['id']}. Code '#{code}'."
    end
  end
end

#run_code(job_id, code) ⇒ Object



317
318
319
# File 'lib/beanstalker/worker.rb', line 317

def run_code(job_id, code)
  eval(code, @top_binding, "(beanstalk job #{job_id})", 1)
end

#run_mapped_job(job) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/beanstalker/worker.rb', line 274

def run_mapped_job(job)
  job_body = get_job_body(job)
  job_kind = job_body['kind']
  job_data = job_body['data']
  job_method = job_data['method']

  job_desc = "#{job_kind}/#{job_method}"

  run_with_ruby_timeout_if_set(job_desc, job) do
    t1 = Time.now
    @map_job = @mapper && @mapper.method_for(job_kind, job_method)
    if @map_job
      @map_job.call(job_data['body'] || {})
      logger.info "Finished. Job id=#{job.stats['id']}. Mapped from '#{job_desc}'. Time taken: #{(Time.now - t1).to_f} sec"
    else
      logger.error "Job id=#{job.stats['id']}. Mapping not found: '#{job_desc}'. Releases #{job.stats['releases']}. Deleting"
    end
    job.delete
  end
end

#run_with_ruby_timeout_if_set(job_desc, job, &block) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/beanstalker/worker.rb', line 261

def run_with_ruby_timeout_if_set(job_desc, job, &block)
  if @options[:ruby_timeout]
    timeout = (job.stats['ttr'].to_f * 0.8)
    logger.info "TO=#{timeout} sec. Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}"
    Timeout.timeout(timeout) do
      block.call
    end
  else
    logger.info "Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}"
    block.call
  end
end

#safe_dispatch(job) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/beanstalker/worker.rb', line 169

def safe_dispatch(job)
  begin
    return dispatch(job)
  rescue Timeout::Error
    handle_timeout(job)
  rescue Interrupt => ex
    begin job.release rescue :ok end
    raise ex
  rescue Exception => ex
    handle_error(job, ex)
  ensure
    Daemonizer.flush_logger
  end
end

#shutdownObject



93
94
95
# File 'lib/beanstalker/worker.rb', line 93

def shutdown
  do_all_work
end

#startupObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/beanstalker/worker.rb', line 74

def startup
  tubes = Array.wrap(@options[:tube] || "default").map(&:to_s) #["default"]
  watched_tubes = Beanstalker::Queue.queue.list_tubes_watched.values.flatten #["default"]
  to_watch = tubes - watched_tubes
  to_ignore = watched_tubes - tubes
  to_watch.each do |t|
    Beanstalker::Queue.queue.watch(t)
  end
  to_ignore.each do |t|
    begin
      Beanstalker::Queue.queue.ignore(t)
    rescue Exception => e
      Daemonizer.logger.info "Failed to ignore tube: #{t}"
    end
  end
  Daemonizer.logger.info "Using tubes: #{Beanstalker::Queue.queue.list_tubes_watched.values.flatten.join(',')}"
  Daemonizer.flush_logger
end