Class: QProcessor::Processor
- Inherits:
-
Object
- Object
- QProcessor::Processor
- Defined in:
- lib/qprocessor/processor.rb
Overview
This class encapsulates a lot of the functionality around creating a queue processor that pulls work from a queue and passes it to an instance of a specified class for ‘handling’.
Constant Summary collapse
- BEANSTALK_URL_PATTERN =
A constant for a regular expression to match Beanstalk URLs
/^beanstalk:\/\/([^:\/]+)[:]?([\d]*)[\/]?(.*)$/- DEFAULT_PARSER_CLASS =
A constant with the name of the default job parser class.
"QProcessor::YAMLParser"- DEFAULT_TUBE_NAME =
The default Beanstalk tube name.
"default"- MAX_ERROR_RESTART_INTERVAL =
A constant containing the maximum permitted setting for the inter-error sleep interval.
32
Instance Attribute Summary collapse
-
#class_name ⇒ Object
readonly
Returns the value of attribute class_name.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue_url ⇒ Object
readonly
Returns the value of attribute queue_url.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Instance Method Summary collapse
-
#initialize(name, url, class_names, settings = {}) ⇒ Processor
constructor
Constructor for the Processor class.
-
#start ⇒ Object
Starts processing of the queue.
Constructor Details
#initialize(name, url, class_names, settings = {}) ⇒ Processor
Constructor for the Processor class.
33 34 35 36 37 38 39 40 41 |
# File 'lib/qprocessor/processor.rb', line 33 def initialize(name, url, class_names, settings={}) @instance = nil @name = name @parser_class = get_class!(class_names.fetch(:parser, DEFAULT_PARSER_CLASS)) @processor_class = get_class!(class_names[:processor]) @queue_url = url @settings = {}.merge(settings) @terminate = false end |
Instance Attribute Details
#class_name ⇒ Object (readonly)
Returns the value of attribute class_name.
42 43 44 |
# File 'lib/qprocessor/processor.rb', line 42 def class_name @class_name end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
42 43 44 |
# File 'lib/qprocessor/processor.rb', line 42 def name @name end |
#queue_url ⇒ Object (readonly)
Returns the value of attribute queue_url.
42 43 44 |
# File 'lib/qprocessor/processor.rb', line 42 def queue_url @queue_url end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
42 43 44 |
# File 'lib/qprocessor/processor.rb', line 42 def settings @settings end |
Instance Method Details
#start ⇒ Object
Starts processing of the queue. This method will not return to the caller as it will run a loop processing the queue jobs as they become available.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/qprocessor/processor.rb', line 46 def start queue = connect interval = 0 while !@terminate begin logger.debug("The '#{name}' queue processor is listening for jobs.") queue.reserve do|job| logger.debug "The '#{name}' queue processor received job id #{job.id}." handle(job) interval = 0 end rescue => error logger.error "The '#{name}' queue processor caught an exception.\nType: #{error.class.name}\n"\ "Message: #{error}\nStack Trace:\n#{error.backtrace.join("\n")}" sleep(interval) if interval > 0 interval = interval > 0 ? interval * 2 : 1 interval = MAX_ERROR_RESTART_INTERVAL if interval > MAX_ERROR_RESTART_INTERVAL end end end |