Class: Backburner::Worker Abstract
- Inherits:
-
Object
- Object
- Backburner::Worker
- Defined in:
- lib/backburner/worker.rb
Overview
Subclass and override #process_tube_names, #prepare and #start to implement a custom Worker class.
Direct Known Subclasses
Backburner::Workers::Forking, Backburner::Workers::Simple, Backburner::Workers::Threading, Backburner::Workers::ThreadsOnFork
Class Attribute Summary collapse
Instance Attribute Summary collapse
-
#connection ⇒ Object
List of tube names to be watched and processed.
-
#tube_names ⇒ Object
List of tube names to be watched and processed.
Class Method Summary collapse
-
.enqueue(job_class, args = [], opts = {}) ⇒ Object
Enqueues a job to be processed later by a worker.
-
.start(tube_names = nil) ⇒ Object
Starts processing jobs with the specified tube_names.
Instance Method Summary collapse
-
#initialize(tube_names = nil) ⇒ Worker
constructor
Constructs a new worker for processing jobs within specified tubes.
-
#prepare ⇒ Object
abstract
Used to prepare the job queues before job processing is initiated.
-
#process_tube_names(tube_names) ⇒ Object
Processes tube_names given tube_names array.
-
#shutdown ⇒ Object
Triggers this worker to shutdown.
-
#start ⇒ Object
Starts processing ready jobs indefinitely.
-
#work_one_job(conn = connection, tube_name = nil) ⇒ Object
Performs a job by reserving a job from beanstalk and processing it.
Methods included from Logger
included, #job_started_at, #log_error, #log_info, #log_job_begin, #log_job_end, #logger
Methods included from Helpers
#classify, #constantize, #dasherize, #exception_message, #expand_tube_name, included, #queue_config, #resolve_max_job_retries, #resolve_priority, #resolve_respond_timeout, #resolve_retry_delay, #resolve_retry_delay_proc
Constructor Details
#initialize(tube_names = nil) ⇒ Worker
Constructs a new worker for processing jobs within specified tubes.
86 87 88 89 90 |
# File 'lib/backburner/worker.rb', line 86 def initialize(tube_names = nil) @connection = new_connection @tube_names = process_tube_names(tube_names) register_signal_handlers! end |
Class Attribute Details
.known_queue_classes ⇒ Object
17 18 19 |
# File 'lib/backburner/worker.rb', line 17 def known_queue_classes @known_queue_classes ||= [] end |
Instance Attribute Details
#connection ⇒ Object
List of tube names to be watched and processed
80 81 82 |
# File 'lib/backburner/worker.rb', line 80 def connection @connection end |
#tube_names ⇒ Object
List of tube names to be watched and processed
80 81 82 |
# File 'lib/backburner/worker.rb', line 80 def tube_names @tube_names end |
Class Method Details
.enqueue(job_class, args = [], opts = {}) ⇒ Object
Enqueues a job to be processed later by a worker. Options: ‘pri` (priority), `delay` (delay in secs), `ttr` (time to respond), `queue` (queue name)
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/backburner/worker.rb', line 29 def self.enqueue(job_class, args = [], opts = {}) # Invoke Procs if they are sent opts.each_key do |k| opts[k] = opts[k].call job_class, args if opts[k].instance_of?(Proc) end opts[:shard_key] = opts[:shard_key].nil? ? 'X' : opts[:shard_key].to_s pri = resolve_priority(opts[:pri] || job_class) delay = [0, opts[:delay].to_i].max ttr = resolve_respond_timeout(opts[:ttr] || job_class) res = Backburner::Hooks.invoke_hook_events(job_class, :before_enqueue, *args) return nil unless res # stop if hook is false data = { class: job_class.name, args: args, ttr: ttr } queue = opts[:queue] && (opts[:queue].is_a?(Proc) ? opts[:queue].call(job_class) : opts[:queue]) begin response = nil connection = Backburner::Connection.new(Backburner.configuration.allq_url) connection.retryable do tube_name = (queue || job_class) serialized_data = Backburner.configuration.job_serializer_proc.call(data) send_data = { pri: pri, delay: delay, ttr: ttr } opts.merge!(send_data) response = connection.put(tube_name, serialized_data, opts) end return nil unless Backburner::Hooks.invoke_hook_events(job_class, :after_enqueue, *args) ensure connection.close if connection end response end |
.start(tube_names = nil) ⇒ Object
Starts processing jobs with the specified tube_names.
73 74 75 76 77 |
# File 'lib/backburner/worker.rb', line 73 def self.start(tube_names = nil) new(tube_names).start rescue SystemExit # do nothing end |
Instance Method Details
#prepare ⇒ Object
Define this in your worker subclass
Used to prepare the job queues before job processing is initiated.
to be run once before processing. Recommended to watch tubes or print a message to the logs with ‘log_info’
112 113 114 |
# File 'lib/backburner/worker.rb', line 112 def prepare raise NotImplementedError end |
#process_tube_names(tube_names) ⇒ Object
This method can be overridden in inherited workers
Processes tube_names given tube_names array. Should return normalized tube_names as an array of strings.
to add more complex tube name processing.
133 134 135 |
# File 'lib/backburner/worker.rb', line 133 def process_tube_names(tube_names) compact_tube_names(tube_names) end |
#shutdown ⇒ Object
Triggers this worker to shutdown
117 118 119 120 121 122 |
# File 'lib/backburner/worker.rb', line 117 def shutdown Thread.new do log_info 'Worker exiting...' end Kernel.exit end |
#start ⇒ Object
Starts processing ready jobs indefinitely. Primary way to consume and process jobs in specified tubes.
98 99 100 |
# File 'lib/backburner/worker.rb', line 98 def start raise NotImplementedError end |
#work_one_job(conn = connection, tube_name = nil) ⇒ Object
Performs a job by reserving a job from beanstalk and processing it
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/backburner/worker.rb', line 142 def work_one_job(conn = connection, tube_name = nil) if tube_name.nil? log_error 'Sampling tube, this is bad practice for Allq' tube_name = @tube_names.sample end begin job = reserve_job(conn, tube_name) rescue Exception => e log_error "Exception: #{e.}" sleep(rand * 3) return end if job && job.body begin log_job_begin(job.name, job.args) job.process log_job_end(job.name) rescue Backburner::Job::JobFormatInvalid => e log_error (e) rescue StandardError => e # Error occurred processing job log_error (e) unless e.is_a?(Backburner::Job::RetryJob) unless job log_error 'Error occurred before we were able to assign a job. Giving up without retrying!' return end handle_error(e, job.name, job.args, job) end else sleep(rand * 3) end job end |