Simple Job

A gem containing libraries that support running background jobs or tasks. It’s designed to make it easy to:

  • Define a job

  • Enqueue a job

  • Poll for and execute jobs

It fits seamlessly into a rails environment (utilizing its configuration), but does not require one.

It is architected to support multiple types of queue implementations, but currently, it only includes an implementation using AWS SQS (aws.amazon.com/sqs/). Alternate queue implementations could be plugged in by the client using lib/simple_job/sqs_job_queue.rb as an example.

The AWS SQS queue implementation requires the aws-sdk gem, which must be initialized (by calling AWS.config) for this API to be capable of enqueuing or polling for jobs.

Queue Configuration

Queue configuration must be done by both the client and the server.

Only the queues that will be used by each must be defined (a client may configure a subset of the queues used by the server, so long as it configures all the queues that it uses).

SQSJobQueue is the queue implementation used by default. This may be overridden by calling SimpleJob::JobQueue.config :implementation => ‘my_queue_type’

Minimal configuration - specify queue prefix and define one default queue

SimpleJob::SQSJobQueue.config :queue_prefix => 'my-job'
SimpleJob::SQSJobQueue.define_queue 'normal', :default => true

Complex configuration with explicit queue implementation, non-rails-defined environment, multiple queues, and CloudWatch monitoring

SimpleJob::JobQueue.config :implementation => 'sqs'
SimpleJob::SQSJobQueue.config :queue_prefix => 'my-job', :environment => 'production', :cloud_watch_namespace => 'SimpleJob'
SimpleJob::SQSJobQueue.define_queue 'normal', :visibility_timeout => 60, :default => true
SimpleJob::SQSJobQueue.define_queue 'high-priority', :visibility_timeout => 10
SimpleJob::SQSJobQueue.define_queue 'long-running', :visibility_timeout => 3600

AWS CloudWatch monitoring

If you provide a CloudWatch namespace to SimpleJob::SQSJobQueue.config, then a series of metrics will be saved to AWS CloudWatch by the queue’s poll method:

SimpleJob::SQSJobQueue.config :cloud_watch_namespace => 'MyNamespace'

The following metrics will be published:

MessageCheckCount

A count of the number of poll attempts

MessageReceivedCount

A count of the number of poll attempts that return a message

MessageMissCount

A count of the number of poll attempts that do not return a message

ExecutionCount

A count of the number of jobs that are executed

SuccessCount

A count of the number of jobs that are executed without error

ErrorCount

A count of the number of jobs that raise an exception during execution

ExecutionTime

The number of milliseconds it takes the job to execute

TimeToCompletion

The number milliseconds between when a job is requested and when it is successfully completed

ExecutionAttempts

The number of executions that occur before a job is successfully completed

The metrics above are published using the following dimensions:

Environment

The current environment (ie. development vs. production)

SQSQueueName

The name of the SQS queue being polled

Host

The hostname of the machine running the poll operation

JobType

The string label for the type of job being executed

Note that the JobType dimension will only be present on the ExecutionCount, SuccessCount, ErrorCount, ExecutionTime, TimeToCompletion, and ExecutionAttempts metrics.

If no namespace is provided to SimpleJob::SQSJobQueue.config, then no cloudwatch metrics will be gathered or published.

Job Definition

To define a job, simply include the SimpleJob::JobDefinition module, and implement its #execute method.

Attributes declared using simple_job_attribute will be automatically serialized into the JSON message that’s enqueued, and re-populated into the object once it’s dequeued. They each have standard getters/setters defined by attr_accessor, and may be set upon object initialization by passing them as hash keys/values to #new.

ActiveModel validation is supported and included automatically.

Synopsis

class FooSender
  include SimpleJob::JobDefinition

  simple_job_attribute :target, :foo_content  # defines getters/setters for each, and
                                              # adds them to serialized message

  validates :target, :presence => true  # standard ActiveModel validation

  def execute
    puts "#{foo_content} -> #{target}"
  end
end

Job Client Usage

Typical usage of default queue

You may call #enqueue with no arguments, in which case JobQueue.default will be used (defined by passing the :default option when defining the queue as documented in the QueueConfiguration section).

Similar to ActiveRecord’s save method, enqueue will return true or false depending on whether the object passes validation.

f = FooSender.new(:target => 'joe', :foo_content => 'foo!')  # can also assign attributes with f.target=, f.foo_content=
if f.enqueue
  puts 'i just sent some foo to joe!'
else
  puts "the following errors occurred: #{f.errors.full_messages.join('; ')}"
end

json = f.to_json # { "type": "foo_sender", "version": "1", "data": { "target": "joe", "foo_content": "foo!" } }
f_copy = FooSender.new.from_json(json)

Simple usage with explicit queue

To explicitly specify the queue to use, simply specify its type identifier (defined in SQSJobQueue.define_queue declaration) when calling enqueue.

f = FooSender.new
f.target = 'bob'
f.foo_content = 'foo!'
f.enqueue!('normal')  # raises exception if operation fails

Queue configuration for multiple enqueue operations

Alternatively, the queue may be attached to the job upfront for multiple enqueue operations:

FooSender.job_queue('high-priority')
f1 = FooSender.new(:target => 'cookie monster', :foo_content => 'cookies and milk')
f1.enqueue  # high-priority queue will be used
f2 = FooSender.new(:target => 'oscar the grouch', :foo_content => 'pizza')
f2.enqueue  # high-priority queue will be used

Job Server Usage

Calling #poll on a queue instance will, by default, dispatch each job to the proper registered class based on type and version. Its options are passed through to the underlying implementation. See SQSJobQueue::poll for documentation of the options it accepts.

JobQueue.default.poll(:poll_interval => 1, :idle_timeout => 60)

You can override the default behavior by passing a block to #poll that accepts both a job definition instance and an SQS message. In this case, you must call “definition.execute” in the block. The default handler simply calls definition.execute.

JobQueue.default.poll(:poll_interval => 1, :idle_timeout => 60) do |definition, message|
  logger.debug "received message: #{message.inspect}"
  logger.debug "dispatched to definition #{definition.inspect}; executing..."
  definition.execute
end