Class: Refinery::Worker
- Inherits:
-
Object
- Object
- Refinery::Worker
- Includes:
- Configurable, Loggable, Queueable, Utilities, Validations
- Defined in:
- lib/refinery/worker.rb
Overview
Base class for workers. Place subclasses of this in the workers directory.
Workers may include validation logic to verify that the message has the correct keys and values before processing.
Instance Method Summary collapse
-
#data_store(options) ⇒ Object
Get the data store for the worker.
-
#initialize(daemon) ⇒ Worker
constructor
Initialize the worker with the given daemon.
-
#run(message) ⇒ Object
Run the worker with the given message.
Methods included from Queueable
Methods included from Configurable
Methods included from Loggable
Methods included from Validations
Methods included from Utilities
#camelize, #decode_message, #encode_message, #host_info
Constructor Details
#initialize(daemon) ⇒ Worker
Initialize the worker with the given daemon.
15 16 17 |
# File 'lib/refinery/worker.rb', line 15 def initialize(daemon) @daemon = daemon end |
Instance Method Details
#data_store(options) ⇒ Object
Get the data store for the worker.
The data store is provided through the Moneta interface.
If the configuration providers a data_store:class option then that class will be used (the class must be in the Moneta module), otherwise Moneta::S3 will be used.
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/refinery/worker.rb', line 48 def data_store() class_name = processor_config['workers']['data_store']['class'] rescue 'S3' ds_class = Moneta.const_get(camelize(class_name)) (@data_store ||= {})[] ||= ds_class.new( :access_key_id => config['aws']['credentials']['access_key_id'], :secret_access_key => config['aws']['credentials']['secret_access_key'], :bucket => [:bucket], :multi_thread => true ) end |
#run(message) ⇒ Object
Run the worker with the given message. The result from the worker’s execute
method is returned along with the run time.
Validation will occur prior to calling execute.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/refinery/worker.rb', line 23 def run() result = false validate() logger.debug "Executing worker #{self.class.name}" time = Benchmark.realtime do begin result = execute() rescue Exception => e logger.error "Error executing worker #{self.class.name}: #{e.}" raise e end end logger.debug "Completed worker #{self.class.name} in #{time} seconds" return result, time end |