Class: Beanstalker::Worker
- Inherits:
-
Object
- Object
- Beanstalker::Worker
- Defined in:
- lib/beanstalker/worker.rb
Constant Summary collapse
- SLEEP_TIME =
rails loads this file twice
60
Class Attribute Summary collapse
-
.before_filter ⇒ Object
Returns the value of attribute before_filter.
-
.custom_error_handler ⇒ Object
Returns the value of attribute custom_error_handler.
-
.custom_timeout_handler ⇒ Object
Returns the value of attribute custom_timeout_handler.
-
.finish ⇒ Object
Returns the value of attribute finish.
Class Method Summary collapse
- .before_reserve(&block) ⇒ Object
- .before_reserves ⇒ Object
- .default_handle_error(job, ex) ⇒ Object
- .default_handle_timeout(job) ⇒ Object
- .error_handler(&block) ⇒ Object
- .run_before_reserve ⇒ Object
- .timeout_handler(&block) ⇒ Object
Instance Method Summary collapse
- #brief?(t1, t2) ⇒ Boolean
- #class_error_handler(klass) ⇒ Object
- #dispatch(job) ⇒ Object
- #do_all_work ⇒ Object
- #get_job ⇒ Object
- #get_job_body(job) ⇒ Object
- #handle_error(job, ex) ⇒ Object
- #handle_timeout(job) ⇒ Object
-
#initialize(top_binding, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #logger ⇒ Object
- #main_loop ⇒ Object
- #mapped_job?(job) ⇒ Boolean
- #q_hint ⇒ Object
- #rails_job?(job) ⇒ Boolean
-
#reserve_and_set_hint ⇒ Object
This heuristic is to help prevent one queue from starving.
- #run ⇒ Object
- #run_ao_job(job) ⇒ Object
- #run_code(job_id, code) ⇒ Object
- #run_mapped_job(job) ⇒ Object
- #run_with_ruby_timeout_if_set(job_desc, job, &block) ⇒ Object
- #safe_dispatch(job) ⇒ Object
- #shutdown ⇒ Object
- #startup ⇒ Object
Constructor Details
#initialize(top_binding, options = {}) ⇒ Worker
Returns a new instance of Worker.
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/beanstalker/worker.rb', line 54 def initialize(top_binding, = {}) mapper_file = "#{RAILS_ROOT}/config/beanstalker_mapper.rb" @mapper = Beanstalker::Mapper.new(mapper_file) if File.exist?(mapper_file) @top_binding = top_binding @stop = false @options = if @options && @options[:servers] Beanstalker::Queue.queue = Beanstalk::Pool.new(@options[:servers]) end end |
Class Attribute Details
.before_filter ⇒ Object
Returns the value of attribute before_filter.
31 32 33 |
# File 'lib/beanstalker/worker.rb', line 31 def before_filter @before_filter end |
.custom_error_handler ⇒ Object
Returns the value of attribute custom_error_handler.
29 30 31 |
# File 'lib/beanstalker/worker.rb', line 29 def custom_error_handler @custom_error_handler end |
.custom_timeout_handler ⇒ Object
Returns the value of attribute custom_timeout_handler.
30 31 32 |
# File 'lib/beanstalker/worker.rb', line 30 def custom_timeout_handler @custom_timeout_handler end |
.finish ⇒ Object
Returns the value of attribute finish.
28 29 30 |
# File 'lib/beanstalker/worker.rb', line 28 def finish @finish end |
Class Method Details
.before_reserve(&block) ⇒ Object
45 46 47 |
# File 'lib/beanstalker/worker.rb', line 45 def before_reserve(&block) before_reserves << block end |
.before_reserves ⇒ Object
41 42 43 |
# File 'lib/beanstalker/worker.rb', line 41 def before_reserves @before_reserves ||= [] end |
.default_handle_error(job, ex) ⇒ Object
237 238 239 240 241 242 243 |
# File 'lib/beanstalker/worker.rb', line 237 def self.default_handle_error(job, ex) Daemonizer.logger.info "Job failed: #{job.server}/#{job.id}" Daemonizer.logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) job.decay rescue Beanstalk::UnexpectedResponse => e Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}" end |
.default_handle_timeout(job) ⇒ Object
245 246 247 248 249 250 |
# File 'lib/beanstalker/worker.rb', line 245 def self.default_handle_timeout(job) Daemonizer.logger.info "Job timeout: #{job.server}/#{job.id}" job.decay rescue Beanstalk::UnexpectedResponse => e Daemonizer.logger.info "Unexpected Beanstalkd error: #{job.server}/#{job.id}. #{e.inspect}" end |
.error_handler(&block) ⇒ Object
33 34 35 |
# File 'lib/beanstalker/worker.rb', line 33 def error_handler(&block) self.custom_error_handler = block end |
.run_before_reserve ⇒ Object
49 50 51 |
# File 'lib/beanstalker/worker.rb', line 49 def run_before_reserve before_reserves.each {|b| b.call} end |
.timeout_handler(&block) ⇒ Object
37 38 39 |
# File 'lib/beanstalker/worker.rb', line 37 def timeout_handler(&block) self.custom_timeout_handler = block end |
Instance Method Details
#brief?(t1, t2) ⇒ Boolean
119 120 121 |
# File 'lib/beanstalker/worker.rb', line 119 def brief?(t1, t2) ((t2 - t1) * 100).to_i.abs < 10 end |
#class_error_handler(klass) ⇒ Object
176 177 178 179 180 181 182 183 184 185 |
# File 'lib/beanstalker/worker.rb', line 176 def class_error_handler(klass) if klass.respond_to?(:async_error_handler) and async_error_handler = klass.async_error_handler and async_error_handler.is_a?(Proc) then async_error_handler else false end end |
#dispatch(job) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/beanstalker/worker.rb', line 148 def dispatch(job) ActiveRecord::Base.verify_active_connections! logger.info "Got job: #{get_job_body(job).inspect}" if rails_job?(job) run_ao_job(job) elsif mapped_job?(job) run_mapped_job(job) else logger.error "Job #{job.inspect} cannot be processed... deleteing" job.delete end end |
#do_all_work ⇒ Object
320 321 322 323 324 |
# File 'lib/beanstalker/worker.rb', line 320 def do_all_work logger.info 'finishing all running jobs' f = self.class.finish f.call if f end |
#get_job ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/beanstalker/worker.rb', line 123 def get_job loop do begin Beanstalker::Queue.queue.connect self.class.run_before_reserve return reserve_and_set_hint rescue Interrupt => ex raise ex rescue SignalException => ex raise ex rescue Beanstalk::DeadlineSoonError # Do nothing; immediately try again, giving the user a chance to # clean up in the before_reserve hook. Daemonizer.logger.info 'Job deadline soon; you should clean up.' rescue Exception => ex @q_hint = nil # in case there's something wrong with this conn Daemonizer.logger.info( "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) Daemonizer.logger.info 'something is wrong. We failed to get a job.' Daemonizer.logger.info "sleeping for #{SLEEP_TIME}s..." sleep(SLEEP_TIME) end end end |
#get_job_body(job) ⇒ Object
286 287 288 |
# File 'lib/beanstalker/worker.rb', line 286 def get_job_body(job) job.ybody.with_indifferent_access end |
#handle_error(job, ex) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/beanstalker/worker.rb', line 187 def handle_error(job, ex) custom_error_handler_ok = false Daemonizer.logger.warn "Handling exception: #{ex.backtrace.join("\n")}, job = #{job.id}" if rails_job?(job) class_name = get_job_body(job)[:data][:class] if class_name begin klass = class_name.constantize rescue Exception => e klass = nil end error_handler = class_error_handler(klass) if error_handler.is_a?(Proc) Daemonizer.logger.info "Running custom error handler for class #{class_name}, job = #{job.id}" error_handler.call(job, ex) job_reserved = begin job.stats['state'] == 'reserved' rescue Beanstalk::NotFoundError false end if job_reserved Daemonizer.logger.info "Custom error handler for class #{class_name} didn't release job. job = #{job.id}" else Daemonizer.logger.info "Custom error handler for class #{class_name} released job. job = #{job.id}" custom_error_handler_ok = true end end end end unless custom_error_handler_ok Daemonizer.logger.info "Running common handler. job = #{job.id}" if self.class.custom_error_handler self.class.custom_error_handler.call(job, ex) else self.class.default_handle_error(job, ex) end else end end |
#handle_timeout(job) ⇒ Object
229 230 231 232 233 234 235 |
# File 'lib/beanstalker/worker.rb', line 229 def handle_timeout(job) if self.class.custom_timeout_handler self.class.custom_timeout_handler.call(job) else self.class.default_handle_timeout(job) end end |
#logger ⇒ Object
103 104 105 |
# File 'lib/beanstalker/worker.rb', line 103 def logger defined?(Daemonizer) && Daemonizer.logger or RAILS_DEFAULT_LOGGER end |
#main_loop ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/beanstalker/worker.rb', line 65 def main_loop trap('TERM') { @stop = true } loop do break if @stop safe_dispatch(get_job) end end |
#mapped_job?(job) ⇒ Boolean
316 317 318 |
# File 'lib/beanstalker/worker.rb', line 316 def mapped_job?(job) @mapper && @mapper.can_handle_kind?(get_job_body(job)['kind']) end |
#q_hint ⇒ Object
99 100 101 |
# File 'lib/beanstalker/worker.rb', line 99 def q_hint @q_hint || Beanstalker::Queue.queue end |
#rails_job?(job) ⇒ Boolean
312 313 314 |
# File 'lib/beanstalker/worker.rb', line 312 def rails_job?(job) get_job_body(job)['kind'].to_s == 'rails_beanstalker' end |
#reserve_and_set_hint ⇒ Object
This heuristic is to help prevent one queue from starving. The idea is that if the connection returns a job right away, it probably has more available. But if it takes time, then it’s probably empty. So reuse the same connection as long as it stays fast. Otherwise, have no preference.
111 112 113 114 115 116 117 |
# File 'lib/beanstalker/worker.rb', line 111 def reserve_and_set_hint t1 = Time.now.utc return job = q_hint.reserve ensure t2 = Time.now.utc @q_hint = if brief?(t1, t2) and job then job.conn else nil end end |
#run ⇒ Object
92 93 94 95 96 97 |
# File 'lib/beanstalker/worker.rb', line 92 def run startup main_loop rescue Interrupt shutdown end |
#run_ao_job(job) ⇒ Object
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/beanstalker/worker.rb', line 290 def run_ao_job(job) job_data = get_job_body(job)['data'] code = job_data['code'] run_with_ruby_timeout_if_set(code, job) do t1 = Time.now f = self.class.before_filter statistics = job.stats.dup can_run = f ? f.call(job) : true if can_run run_code(job.id, code) job.delete logger.info "Finished. Job id=#{statistics['id']}. Code '#{code}'. Time taken: #{(Time.now - t1).to_f} sec" else logger.info "Not runnind due to :before_filter restriction. Job id=#{statistics['id']}. Code '#{code}'." end end end |
#run_code(job_id, code) ⇒ Object
308 309 310 |
# File 'lib/beanstalker/worker.rb', line 308 def run_code(job_id, code) eval(code, @top_binding, "(beanstalk job #{job_id})", 1) end |
#run_mapped_job(job) ⇒ Object
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/beanstalker/worker.rb', line 265 def run_mapped_job(job) job_body = get_job_body(job) job_kind = job_body['kind'] job_data = job_body['data'] job_method = job_data['method'] job_desc = "#{job_kind}/#{job_method}" run_with_ruby_timeout_if_set(job_desc, job) do t1 = Time.now @map_job = @mapper && @mapper.method_for(job_kind, job_method) if @map_job @map_job.call(job_data['body'] || {}) logger.info "Finished. Job id=#{job.stats['id']}. Mapped from '#{job_desc}'. Time taken: #{(Time.now - t1).to_f} sec" else logger.error "Job id=#{job.stats['id']}. Mapping not found: '#{job_desc}'. Releases #{job.stats['releases']}. Deleting" end job.delete end end |
#run_with_ruby_timeout_if_set(job_desc, job, &block) ⇒ Object
252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/beanstalker/worker.rb', line 252 def run_with_ruby_timeout_if_set(job_desc, job, &block) if @options[:ruby_timeout] timeout = (job.stats['ttr'].to_f * 0.8) logger.info "TO=#{timeout} sec. Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}" Timeout.timeout(timeout) do block.call end else logger.info "Job id=#{job.stats['id']}. Running '#{job_desc}'. Age #{job.stats['age']}, Releases #{job.stats['releases']}, TTR #{job.stats['ttr']}" block.call end end |
#safe_dispatch(job) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/beanstalker/worker.rb', line 161 def safe_dispatch(job) begin return dispatch(job) rescue Timeout::Error handle_timeout(job) rescue Interrupt => ex begin job.release rescue :ok end raise ex rescue Exception => ex handle_error(job, ex) ensure Daemonizer.flush_logger end end |
#shutdown ⇒ Object
88 89 90 |
# File 'lib/beanstalker/worker.rb', line 88 def shutdown do_all_work end |
#startup ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/beanstalker/worker.rb', line 73 def startup tubes = Array.wrap(@options[:tube] || "default").map(&:to_s) #["default"] watched_tubes = Beanstalker::Queue.queue.list_tubes_watched.values.flatten #["default"] to_watch = tubes - watched_tubes to_ignore = watched_tubes - tubes to_watch.each do |t| Beanstalker::Queue.queue.watch(t) end to_ignore.each do |t| Beanstalker::Queue.queue.ignore(t) end Daemonizer.logger.info "Using tubes: #{Beanstalker::Queue.queue.list_tubes_watched.values.flatten.join(',')}" Daemonizer.flush_logger end |