Class: Ayl::Beanstalk::Worker
- Inherits:
-
Worker
- Object
- Worker
- Ayl::Beanstalk::Worker
- Includes:
- Pool, Logging
- Defined in:
- lib/ayl-beanstalk/worker.rb
Instance Method Summary collapse
- #decay_job(job) ⇒ Object
- #delete_job(job) ⇒ Object
- #handle_job_decay(job, ex) ⇒ Object
-
#initialize(host = 'localhost', port = 11300) ⇒ Worker
constructor
A new instance of Worker.
- #process_messages ⇒ Object
Methods included from Pool
Constructor Details
#initialize(host = 'localhost', port = 11300) ⇒ Worker
Returns a new instance of Worker.
9 10 11 12 13 |
# File 'lib/ayl-beanstalk/worker.rb', line 9 def initialize(host='localhost', port=11300) logger.debug "#{self.class.name}.initialize(#{host.inspect}, #{port})" @host = host @port = port end |
Instance Method Details
#decay_job(job) ⇒ Object
77 78 79 80 81 82 |
# File 'lib/ayl-beanstalk/worker.rb', line 77 def decay_job(job) job.decay rescue RuntimeError => ex logger.error "#{self.class.name} Error decaying job: #{ex}\n#{ex.backtrace.join("\n")}" Ayl::Mailer.instance.("#{self.class.name} Error decaying job", ex) end |
#delete_job(job) ⇒ Object
70 71 72 73 74 75 |
# File 'lib/ayl-beanstalk/worker.rb', line 70 def delete_job(job) job.delete rescue RuntimeError => ex logger.error "#{self.class.name} Error deleting job: #{ex}\n#{ex.backtrace.join("\n")}" Ayl::Mailer.instance.("#{self.class.name} Error deleting job", ex) end |
#handle_job_decay(job, ex) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/ayl-beanstalk/worker.rb', line 58 def handle_job_decay(job, ex) logger.debug "Age of job: #{job.age}" if job.age > 60 Ayl::Mailer.instance.("#{self.class.name} Deleting decayed job; it just took too long.", ex) logger.debug "Deleting job" delete_job(job) else logger.debug "Decaying job" decay_job(job) end end |
#process_messages ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/ayl-beanstalk/worker.rb', line 15 def logger.debug "#{self.class.name} entering process_messages loop watching: #{Ayl::MessageOptions.default_queue_name}" # Set the queue that we will be watching pool.watch(Ayl::MessageOptions.default_queue_name) while true break if @stop job = nil begin job = pool.reserve rescue Exception => ex logger.error "#{self.class.name} Exception in process_messages: #{ex}\n#{ex.backtrace.join("\n")}" Ayl::Mailer.instance.("#{self.class.name} Exception in process_messages.", ex) delete_job(job) unless job.nil? next end break if job.nil? msg = nil begin msg = Ayl::Message.from_hash(job.ybody) (msg) delete_job(job) rescue Ayl::UnrecoverableMessageException => ex logger.error "#{self.class.name} Unrecoverable exception in process_messages: #{ex}" Ayl::Mailer.instance.("#{self.class.name} Unrecoverable exception in process_messages", ex) delete_job(job) rescue SystemExit # This exception is raised when 'Kernel.exit' is called. In this case # we want to make sure the job is deleted, then we simply re-raise # the exception and we go bye-bye. delete_job(job) raise rescue Exception => ex logger.error "#{self.class.name} Exception in process_messages: #{ex}\n#{ex.backtrace.join("\n")}" if msg..decay_failed_job handle_job_decay(job, ex) else delete_job(job) Ayl::Mailer.instance.("#{self.class.name} Exception in process_messages.", ex) end end end end |