Class: QProcessor::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/qprocessor/processor.rb

Overview

This class encapsulates a lot of the functionality around creating a queue processor that pulls work from a queue and passes it to an instance of a specified class for ‘handling’.

Constant Summary collapse

MAX_ERROR_RESTART_INTERVAL =

A constant containing the maximum permitted setting for the inter-error sleep interval.

32

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(processor, settings = {}) ⇒ Processor

Constructor for the Processor class.

Parameters:

  • A (Class)

    reference to the class that will used to process jobs.

  • A (Hash)

    Hash of additional settings that will be used by the processor. These will get passed to the processor class when it is instantiated and provides a mechansim for passing in elements such as configuration or settings.



20
21
22
23
24
25
26
# File 'lib/qprocessor/processor.rb', line 20

def initialize(processor, settings={})
  @instance        = nil
  @name            = processor.name
  @processor_class = processor
  @settings        = {}.merge(settings)
  @terminate       = false
end

Instance Attribute Details

#class_nameObject (readonly)

Returns the value of attribute class_name.



27
28
29
# File 'lib/qprocessor/processor.rb', line 27

def class_name
  @class_name
end

#nameObject (readonly)

Returns the value of attribute name.



27
28
29
# File 'lib/qprocessor/processor.rb', line 27

def name
  @name
end

#queue_urlObject (readonly)

Returns the value of attribute queue_url.



27
28
29
# File 'lib/qprocessor/processor.rb', line 27

def queue_url
  @queue_url
end

#settingsObject (readonly)

Returns the value of attribute settings.



27
28
29
# File 'lib/qprocessor/processor.rb', line 27

def settings
  @settings
end

Instance Method Details

#handle(message) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/qprocessor/processor.rb', line 48

def handle(message)
  begin
    logger.debug "The '#{name}' queue processor received message id #{message.id}."
    process(message)
    message.dispose
    interval = 0
  rescue => error
    message.release
    raise
  end
end

#startObject

Starts processing of the queue. This method will not return to the caller as it will run a loop processing the queue jobs as they become available.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/qprocessor/processor.rb', line 31

def start
  queue    = QProcessor::Sources.manufacture!
  interval = 0
  while !@terminate
    begin
      logger.debug("The '#{name}' queue processor is listening for jobs.")
      queue.get {|message| handle(message)}
    rescue => error
      logger.error "The '#{name}' queue processor caught an exception.\nType: #{error.class.name}\n"\
                   "Message: #{error}\nStack Trace:\n#{error.backtrace.join("\n")}"
      sleep(interval) if interval > 0
      interval = interval > 0 ? interval * 2 : 1
      interval = MAX_ERROR_RESTART_INTERVAL if interval > MAX_ERROR_RESTART_INTERVAL
    end
  end
end