Class: Backburner::Worker Abstract
- Inherits:
-
Object
- Object
- Backburner::Worker
- Defined in:
- lib/backburner/worker.rb
Overview
Subclass and override #process_tube_names, #prepare and #start to implement a custom Worker class.
Direct Known Subclasses
Backburner::Workers::Forking, Backburner::Workers::Simple, Backburner::Workers::Threading, Backburner::Workers::ThreadsOnFork
Class Attribute Summary collapse
Instance Attribute Summary collapse
-
#connection ⇒ Object
List of tube names to be watched and processed.
-
#tube_names ⇒ Object
List of tube names to be watched and processed.
Class Method Summary collapse
-
.enqueue(job_class, args = [], opts = {}) ⇒ Object
Enqueues a job to be processed later by a worker.
-
.start(tube_names = nil) ⇒ Object
Starts processing jobs with the specified tube_names.
Instance Method Summary collapse
-
#initialize(tube_names = nil) ⇒ Worker
constructor
Constructs a new worker for processing jobs within specified tubes.
-
#prepare ⇒ Object
abstract
Used to prepare the job queues before job processing is initiated.
-
#process_tube_names(tube_names) ⇒ Object
Processes tube_names given tube_names array.
-
#shutdown ⇒ Object
Triggers this worker to shutdown.
-
#start ⇒ Object
Starts processing ready jobs indefinitely.
-
#work_one_job(conn = connection) ⇒ Object
Performs a job by reserving a job from beanstalk and processing it.
Methods included from Logger
included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger
Methods included from Helpers
#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_priority, #resolve_respond_timeout
Constructor Details
#initialize(tube_names = nil) ⇒ Worker
Constructs a new worker for processing jobs within specified tubes.
73 74 75 76 77 |
# File 'lib/backburner/worker.rb', line 73 def initialize(tube_names=nil) @connection = new_connection @tube_names = self.process_tube_names(tube_names) register_signal_handlers! end |
Class Attribute Details
.known_queue_classes ⇒ Object
16 |
# File 'lib/backburner/worker.rb', line 16 def known_queue_classes; @known_queue_classes ||= []; end |
Instance Attribute Details
#connection ⇒ Object
List of tube names to be watched and processed
67 68 69 |
# File 'lib/backburner/worker.rb', line 67 def connection @connection end |
#tube_names ⇒ Object
List of tube names to be watched and processed
67 68 69 |
# File 'lib/backburner/worker.rb', line 67 def tube_names @tube_names end |
Class Method Details
.enqueue(job_class, args = [], opts = {}) ⇒ Object
Enqueues a job to be processed later by a worker. Options: ‘pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/backburner/worker.rb', line 26 def self.enqueue(job_class, args=[], opts={}) pri = resolve_priority(opts[:pri] || job_class) delay = [0, opts[:delay].to_i].max ttr = resolve_respond_timeout(opts[:ttr] || job_class) res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args) return nil unless res # stop if hook is false data = { :class => job_class.name, :args => args } queue = opts[:queue] && (Proc === opts[:queue] ? opts[:queue].call(job_class) : opts[:queue]) begin response = nil connection = Backburner::Connection.new(Backburner.configuration.beanstalk_url) connection.retryable do tube = connection.tubes[(queue || job_class)] serialized_data = Backburner.configuration.job_serializer_proc.call(data) response = tube.put(serialized_data, :pri => pri, :delay => delay, :ttr => ttr) end return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args) ensure connection.close if connection end response end |
.start(tube_names = nil) ⇒ Object
Starts processing jobs with the specified tube_names.
58 59 60 61 62 63 64 |
# File 'lib/backburner/worker.rb', line 58 def self.start(tube_names=nil) begin self.new(tube_names).start rescue SystemExit # do nothing end end |
Instance Method Details
#prepare ⇒ Object
Define this in your worker subclass
Used to prepare the job queues before job processing is initiated.
to be run once before processing. Recommended to watch tubes or print a message to the logs with ‘log_info’
99 100 101 |
# File 'lib/backburner/worker.rb', line 99 def prepare raise NotImplementedError end |
#process_tube_names(tube_names) ⇒ Object
This method can be overridden in inherited workers
Processes tube_names given tube_names array. Should return normalized tube_names as an array of strings.
to add more complex tube name processing.
120 121 122 |
# File 'lib/backburner/worker.rb', line 120 def process_tube_names(tube_names) compact_tube_names(tube_names) end |
#shutdown ⇒ Object
Triggers this worker to shutdown
104 105 106 107 108 109 |
# File 'lib/backburner/worker.rb', line 104 def shutdown Thread.new do log_info 'Worker exiting...' end Kernel.exit end |
#start ⇒ Object
Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.
85 86 87 |
# File 'lib/backburner/worker.rb', line 85 def start raise NotImplementedError end |
#work_one_job(conn = connection) ⇒ Object
Performs a job by reserving a job from beanstalk and processing it
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/backburner/worker.rb', line 129 def work_one_job(conn = connection) begin job = reserve_job(conn) rescue Beaneater::TimedOutError => e return end self.log_job_begin(job.name, job.args) job.process self.log_job_end(job.name) rescue Backburner::Job::JobFormatInvalid => e self.log_error self.(e) rescue => e # Error occurred processing job self.log_error self.(e) unless e.is_a?(Backburner::Job::RetryJob) unless job self.log_error "Error occurred before we were able to assign a job. Giving up without retrying!" return end # NB: There's a slight chance here that the connection to beanstalkd has # gone down between the time we reserved / processed the job and here. num_retries = job.stats.releases retry_status = "failed: attempt #{num_retries+1} of #{queue_config.max_job_retries+1}" if num_retries < queue_config.max_job_retries # retry again delay = queue_config.retry_delay_proc.call(queue_config.retry_delay, num_retries) rescue queue_config.retry_delay job.retry(num_retries + 1, delay) self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at else # retries failed, bury job.bury self.log_job_end(job.name, "#{retry_status}, burying") if job_started_at end handle_error(e, job.name, job.args, job) end |