Class: QProcessor::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/qprocessor/processor.rb

Overview

This class encapsulates a lot of the functionality around creating a queue processor that pulls work from a queue and passes it to an instance of a specified class for ‘handling’.

Constant Summary collapse

BEANSTALK_URL_PATTERN =

A constant for a regular expression to match Beanstalk URLs

/^beanstalk:\/\/([^:\/]+)[:]?([\d]*)[\/]?(.*)$/
DEFAULT_PARSER_CLASS =

A constant with the name of the default job parser class.

"QProcessor::YAMLParser"
DEFAULT_TUBE_NAME =

The default Beanstalk tube name.

"default"
MAX_ERROR_RESTART_INTERVAL =

A constant containing the maximum permitted setting for the inter-error sleep interval.

32

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, url, class_names, settings = {}) ⇒ Processor

Constructor for the Processor class.

Parameters:

  • The (String)

    name assigned to the processor, primarily used for logging.

  • The (String)

    URL to be used when connecting to the queue system.

  • A (Hash)

    Hash of the class names used by the processor. There are two keys recognised in this Hash - :parser and :processor. The :parser class should be the fully qualified name of the class class that can be used to parse job content. The :processor class should be the fully qualified name of the class that will process jobs.

  • A (Hash)

    Hash of additional settings that will be used by the processor. Valid keys include :logger and :reuse_processor.



33
34
35
36
37
38
39
40
41
# File 'lib/qprocessor/processor.rb', line 33

def initialize(name, url, class_names, settings={})
  @instance        = nil
  @name            = name
  @parser_class    = get_class!(class_names.fetch(:parser, DEFAULT_PARSER_CLASS))
  @processor_class = get_class!(class_names[:processor])
  @queue_url       = url
  @settings        = {}.merge(settings)
  @terminate       = false
end

Instance Attribute Details

#class_nameObject (readonly)

Returns the value of attribute class_name.



42
43
44
# File 'lib/qprocessor/processor.rb', line 42

def class_name
  @class_name
end

#nameObject (readonly)

Returns the value of attribute name.



42
43
44
# File 'lib/qprocessor/processor.rb', line 42

def name
  @name
end

#queue_urlObject (readonly)

Returns the value of attribute queue_url.



42
43
44
# File 'lib/qprocessor/processor.rb', line 42

def queue_url
  @queue_url
end

#settingsObject (readonly)

Returns the value of attribute settings.



42
43
44
# File 'lib/qprocessor/processor.rb', line 42

def settings
  @settings
end

Instance Method Details

#startObject

Starts processing of the queue. This method will not return to the caller as it will run a loop processing the queue jobs as they become available.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/qprocessor/processor.rb', line 46

def start
  queue    = connect
  interval = 0
  while !@terminate
    begin
      logger.debug("The '#{name}' queue processor is listening for jobs.")
      queue.reserve do|job|
        logger.debug "The '#{name}' queue processor received job id #{job.id}."
        handle(job)
        interval = 0
      end
    rescue => error
      logger.error "The '#{name}' queue processor caught an exception.\nType: #{error.class.name}\n"\
                   "Message: #{error}\nStack Trace:\n#{error.backtrace.join("\n")}"
      sleep(interval) if interval > 0
      interval = interval > 0 ? interval * 2 : 1
      interval = MAX_ERROR_RESTART_INTERVAL if interval > MAX_ERROR_RESTART_INTERVAL
    end
  end
end