Class: Refinery::Worker

Inherits:
Object
  • Object
show all
Includes:
Configurable, Loggable, Queueable, Utilities, Validations
Defined in:
lib/refinery/worker.rb

Overview

Base class for workers. Place subclasses of this in the workers directory.

Workers may include validation logic to verify that the message has the correct keys and values before processing.

Instance Method Summary collapse

Methods included from Queueable

#queue, #with_queue

Methods included from Configurable

#config, #config=

Methods included from Loggable

#logger

Methods included from Validations

included

Methods included from Utilities

#camelize, #decode_message, #encode_message, #host_info

Constructor Details

#initialize(daemon) ⇒ Worker

Initialize the worker with the given daemon.



15
16
17
# File 'lib/refinery/worker.rb', line 15

def initialize(daemon)
  @daemon = daemon
end

Instance Method Details

#data_store(options) ⇒ Object

Get the data store for the worker.

The data store is provided through the Moneta interface.

If the configuration providers a data_store:class option then that class will be used (the class must be in the Moneta module), otherwise Moneta::S3 will be used.



48
49
50
51
52
53
54
55
56
57
# File 'lib/refinery/worker.rb', line 48

def data_store(options)
  class_name = processor_config['workers']['data_store']['class'] rescue 'S3'
  ds_class = Moneta.const_get(camelize(class_name))
  (@data_store ||= {})[options] ||= ds_class.new(
    :access_key_id => config['aws']['credentials']['access_key_id'],  
    :secret_access_key => config['aws']['credentials']['secret_access_key'],
    :bucket => options[:bucket],
    :multi_thread => true
  )
end

#run(message) ⇒ Object

Run the worker with the given message. The result from the worker’s execute method is returned along with the run time.

Validation will occur prior to calling execute.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/refinery/worker.rb', line 23

def run(message)
  result = false
  
  validate(message)
  
  logger.debug "Executing worker #{self.class.name}"
  time = Benchmark.realtime do
    begin
      result = execute(message)
    rescue Exception => e
      logger.error "Error executing worker #{self.class.name}: #{e.message}"
      raise e
    end
  end
  logger.debug "Completed worker #{self.class.name} in #{time} seconds"
  return result, time
end