Procrastinator

A storage-agnostic job queue gem in plain Ruby.

Big Picture

Define Task Handler classes like this:

# Sends a welcome email
class SendWelcomeEmail
   attr_accessor :container, :logger, :scheduler

   def run
      # ... etc
   end
end

Then build a task Scheduler:

scheduler = Procrastinator.setup do |config|
   config.with_store some_email_task_database do
      config.define_queue :welcome, SendWelcomeEmail
      config.define_queue :birthday, SendBirthdayEmail, max_attempts: 3
   end

   config.define_queue :thumbnail, GenerateThumbnail, store: 'imgtasks.csv', timeout: 60
end

And defer tasks:

scheduler.defer(:welcome, data: '[email protected]')

scheduler.defer(:thumbnail, data: {file: 'forcett.png', width: 100, height: 150})

scheduler.defer(:birthday, run_at: Time.now + 3600, data: {user_id: 5})

Contents

Installation

Add this line to your application's Gemfile:

gem 'procrastinator'

And then run in a terminal:

bundle install

Task Handlers

Task Handlers are what actually get run on the task queue. They'll look like this:

# This is an example task handler
class MyTask
   # These attributes will be assigned by Procrastinator when the task is run.
   # :data is optional
   attr_accessor :container, :logger, :scheduler, :data

   # Performs the core work of the task. 
   def run
      # ... perform your task ...
   end

   # ==================================
   #          OPTIONAL HOOKS
   # ==================================
   #
   # You can always omit any of the methods below. Only #run is mandatory.
   ##

   # Called after the task has completed successfully.
   # 
   # @param run_result [Object] The result of #run.
   def success(run_result)
      # ...
   end

   # Called after #run raises any StandardError (or subclass).
   # 
   # @param error [StandardError] Error raised by #run
   def fail(error)
      # ...
   end

   # Called after a permanent failure, either because: 
   #   1. the current time is after the task's expire_at time.
   #   2. the task has failed and the number of attempts is equal to or greater than the queue's `max_attempts`. 
   #
   # If #final_fail is executed, then #fail will not.
   # 
   # @param error [StandardError] Error raised by #run
   def final_fail(error)
      # ...
   end
end

Attribute Accessors

Task Handlers have attributes that are set after the Handler is created. The attributes are enforced early on to prevent the tasks from referencing unknown variables at whatever time they are run - if they're missing, you'll get a MalformedTaskError.

Attribute Required Description
:container Yes Container declared in #setup from the currently running instance
:logger Yes Logger object for the Queue
:scheduler Yes A scheduler object that you can use to schedule new tasks (eg. with #defer)
:data No Data provided to #defer. Calls to #defer will error if they do not provide data when expected and vice-versa.

Errors & Logging

Errors that trigger #fail or #final_fail are saved to the task storage under keywords last_error and last_fail_at.

Each queue worker also keeps a logfile log using the Ruby Logger class. Log files are named after the queue ( eg. log/welcome-queue-worker.log).

scheduler = Procrastinator.setup do |config|
   # you can set custom log location and level:
   config.log_with(directory: '/var/log/myapp/', level: Logger::DEBUG)

   # you can also set the log rotation age or size (see Logger docs for details)
   config.log_with(shift: 1024, age: 5)

   # use a falsey log level to disable logging entirely:
   config.log_with(level: false)
end

The logger can be accessed in your tasks by calling logger or @logger.

# Example handler with logging
class MyTask
   attr_accessor :container, :logger, :scheduler

   def run
      logger.info('This task got run. Hooray!')
   end
end

Some events are always logged by default:

event level
Task completed INFO
Task cailure ERROR

Configuration

Procrastinator.setup allows you to define which queues are available and other general settings.

require 'procrastinator'

scheduler = Procrastinator.setup do |config|
   # ...
end

It then returns a Task Scheduler that your code can use to defer tasks.

Defining Queues

In setup, call #define_queue with a symbol name and that queue's Task Handler class:

# You must provide a queue name and the class that handles those jobs
config.define_queue :greeting, SendWelcomeEmail

# but queues have some optional settings, too
config.define_queue :greeting, SendWelcomeEmail, store: 'tasks.csv', timeout: 60, max_attempts: 2, update_period: 1

# all defaults set explicitly
config.define_queue :greeting, SendWelcomeEmail, store: 'procrastinator.csv', timeout: 3600, max_attempts: 20, update_period: 10

Description of keyword options:

Option Description
:store Storage IO object for tasks. See Task Store
:timeout Max duration (seconds) before tasks are failed for taking too long
:max_attempts Once a task has been attempted max_attempts times, it will be permanently failed.
:update_period Delay (seconds) between reloads of all tasks from the task store

Task Store

A task store is a strategy pattern object that knows how to read and write tasks in your data storage (eg. database, HTTP API, CSV file, microdot, etc).

task_store = ReminderStore.new # eg. some SQL task storage class you wrote

Procrastinator.setup do |config|
   config.define_queue(:reminder, ReminderTask, store: task_store)

   # to use the default CSV storage, provide :store with a string or Pathname
   config.define_queue(:reminder, ReminderTask, store: '/var/myapp/tasks.csv')
end

A task store is required to implement all of the following methods or else it will raise a MalformedPersisterError:

  1. #read(attributes)

Returns a list of hashes from your datastore that match the given attributes hash. The search attributes will be in their final form (eg. :data will already be serialized). Each hash must contain the properties listed in the Data Fields table.

  1. #create(queue:, run_at:, initial_run_at:, expire_at:, data:)

Saves a task in your storage. Receives a hash with Data Fields keys: :queue, :run_at, :initial_run_at, :expire_at, and :data.

  1. #update(id, new_data)

Saves the provided full Data Fields hash to your datastore.

  1. #delete(id)

Deletes the task with the given identifier from storage

Procrastinator comes with a simple CSV file task store by default, but you are encouraged to build one that suits your situation.

Warning: Task stores shared between queues must be thread-safe if using threaded or daemonized work modes.

Data Fields

These are the data fields for each individual scheduled task. When using the built-in task store, these are the field names. If you have a database, use this to inform your table schema.

Hash Key Type Description
:id integer Unique identifier for this exact task
:queue symbol Name of the queue the task is inside
:run_at datetime Time to attempt running the task next. Updated for retries¹
:initial_run_at datetime Original run_at value. Reset if #reschedule is called.
:expire_at datetime Time to permanently fail the task because it is too late to be useful
:attempts integer Number of times the task has tried to run
:last_fail_at datetime Time of the most recent failure
:last_error string Error message + backtrace of the most recent failure. May be very long.
:data JSON Data to be provided to the task handler, serialized² to JSON.

¹ nil indicates that it is permanently failed and will never run, either due to expiry or too many attempts.

² Serialized using JSON.dump and JSON.parse with symbolized keys. It is strongly recommended to only supply simple data types (eg. id numbers) to reduce storage space, eliminate redundancy, and reduce the chance of a serialization error.

Times are all handled as Ruby stdlib Time objects.

CSV Task Store

Specifying no storage will cause Procrastinator to save tasks using the very basic built-in CSV storage. It is not designed for heavy loads, so you should replace it in a production environment.

The default file path is defined in Procrastinator::Store::SimpleCommaStore::DEFAULT_FILE.

Procrastinator.setup do |config|
   # this will use the default CSV task store. 
   config.define_queue(:reminder, ReminderTask)
end

Shared Task Stores

When there are tasks that use the same storage, you can wrap them in a with_store block.

email_task_store = EmailTaskStore.new # eg. some SQL task storage class you wrote

Procrastinator.setup do |config|
   with_store(email_task_store) do
      # queues defined inside this block will use the email task store
      config.define_queue(:welcome, WelcomeTask)
      config.define_queue(:reminder, ReminderTask)
   end

   # and this will not use it
   config.define_queue(:thumbnails, ThumbnailTask)
end

Task Container

Whatever is given to #provide_container will be available to Task Handlers via the :container attribute and it is intended for dependency injection.

Procrastinator.setup do |config|
   config.provide_container lunch: 'Lasagna'

   # .. other setup stuff ...
end

# ... and in your task ...
class LunchTask
   attr_accessor :container, :logger, :scheduler

   def run
      logger.info("Today's Lunch is: #{ container[:lunch] }")
   end
end

Deferring Tasks

To add tasks to a queue, call #defer on the scheduler returned by Procrastinator.setup:

scheduler = Procrastinator.setup do |config|
   config.define_queue :reminder, EmailEveryone
   config.define_queue :thumbnail, CreateThumbnail
end

# Provide the queue name and any data you want passed in, if needed
scheduler.defer(:reminder)
scheduler.defer(:thumbnail, data: 'forcett.png')

If there is only one queue, you may omit the queue name:

thumbnailer = Procrastinator.setup do |config|
   config.define_queue :thumbnail, CreateThumbnail
end

thumbnailer.defer(data: 'forcett.png')

Timing

You can specify a particular timeframe that a task may be run. The default is to run immediately and never expire.

Be aware that the task is not guaranteed to run at a precise time; the only promise is that the task won't be tried * before* run_at nor after expire_at.

Tasks attempted after expire_at will be final-failed. Setting expire_at to nil means it will never expire (but may still fail permanently if, say, max_attempts is reached).

run_time    = Time.new(2016, 9, 19)
expire_time = Time.new(2016, 9, 20)

# runs on or after 2016 Sept 19, never expires
scheduler.defer(:greeting, run_at: run_time, data: '[email protected]')

# can run immediately but not after 2016 Sept 20
scheduler.defer(:greeting, expire_at: expire_time, data: '[email protected]')

# can run immediately but not after 2016 Sept 20
scheduler.defer(:greeting, run_at: run_time, expire_at: expire_time, data: '[email protected]')

Rescheduling Existing Tasks

Call #reschedule with the queue name and some task-identifying information and then chain #to with the new time.

run_time    = Time.new(2016, 9, 19)
expire_time = Time.new(2016, 9, 20)

scheduler.defer(:reminder, run_at: Time.at(0), data: '[email protected]')

# we can reschedule the task that matches this data
scheduler.reschedule(:reminder, data: '[email protected]').to(run_at: run_time)

# we can also change the expiry time
scheduler.reschedule(:reminder, data: '[email protected]').to(expire_at: expire_time)

# or both
scheduler.reschedule(:reminder, data: '[email protected]').to(run_at:    run_time,
                                                              expire_at: expire_time)

Rescheduling changes the task's...

  • :run_at and :initial_run_at to a new value, if provided
  • :expire_at to a new value if provided.
  • :attempts to 0
  • :last_error and :last_error_at to nil.

Rescheduling will not change :id, :queue or :data.

A RuntimeError is raised if the new run_at is after expire_at.

Retries

Failed tasks are automatically retried, with their run_at updated on an increasing delay (in seconds) according to this formula:

30 + number_of_attempts4

Situations that call #fail or #final_fail will cause the error timestamp and reason to be stored in :last_fail_at and :last_error.

Cancelling

Call #cancel with the queue name and some task-identifying information to narrow the search to a single task.

run_time = Time.parse('April 1')
scheduler.defer(:reminder, run_at: run_time, data: '[email protected]')

# we can cancel the task made above using whatever we know about it
# An error will be raised if it matches multiple tasks or finds none
scheduler.cancel(:reminder, run_at: run_time, data: '[email protected]')

# you could also use the id number directly, if you have it
scheduler.cancel(:reminder, id: 137)

Testing with Procrastinator

Working serially performs tasks from each queue sequentially. There is no multithreading or daemonizing.

Call work on the Scheduler with an optional list of queues to filter by.

# work just one task
scheduler.work.serially

# work the first five tasks
scheduler.work.serially(steps: 5)

# only work tasks on greeting and reminder queues
scheduler.work(:greeting, :reminders).serially(steps: 2)

RSpec Matchers

A have_task RSpec matcher is defined to make testing task scheduling a little easier.

# Note: you must require the matcher file separately
require 'procrastinator'
require 'procrastinator/rspec/matchers'

task_storage = TaskStore.new

scheduler = Procrastinator.setup do |config|
   config.define_queue :welcome, SendWelcome, store: task_storage
end

scheduler.defer(data: '[email protected]')

expect(task_storage).to have_task(data: '[email protected]')

Running Tasks

When you are ready to run a Procrastinator daemon in production, you may use some provided Rake tasks.

In your Rake file call DaemonTasks.define with a block that constructs a scheduler instance.

# Rakefile
require 'rake'
require 'procrastinator/rake/daemon_tasks'

# Defines a set of tasks that will control a Procrastinator daemon
# Default pid_path is /tmp/procrastinator.pid
Procrastinator::Rake::DaemonTasks.define do
   Procrastinator.setup do
      # ... etc ...
   end
end

You can title the daemon process by specifying the pid_path with a specific .pid file. If does not end with '.pid' it is assumed to be a directory name, and procrastinator.pid is appended.

# Rakefile

# This would define a process titled my-app
Procrastinator::Rake::DaemonTasks.define(pid_path: 'my-app.pid') do
   # ... build a Procrastinator instance here ...
end

# equivalent to ./pids/procrastinator.pid
Procrastinator::Rake::DaemonTasks.define(pid_path: 'pids') do
   # ... build a Procrastinator instance here ...
end

Note: There can be a distinction between process full title (/proc/*/cmdline) vs the shorter name (/proc/*/comm). Some tools like ps and top display the process title, while others like pstree show the process name.

Procrastinator uses Ruby's Process.setproctitle, which only affects the title.

Either run the generated Rake tasks in a terminal or with your daemon monitoring tool of choice (eg. Monit, systemd)

# In terminal
bundle exec rake procrastinator:start
bundle exec rake procrastinator:status
bundle exec rake procrastinator:restart
bundle exec rake procrastinator:stop

There are instructions for using Procrastinator with Monit in the github wiki.

Similar Tools

Procrastinator is a library that exists to enable job queues with flexibility in storage mechanism and minimal dependencies. It's neat but it is specifically intended for smaller datasets. Some other approaches include:

Linux etc: Cron and At

Consider Cron for tasks that run on a regular schedule.

Consider At for tasks that run once at a particular time.

While neither tool natively supports retry, they can be great solutions for simple situations.

Gem: Resque

Consider Resque for larger datasets (eg. 10,000+ jobs) where performance optimization becomes relevant.

Gem: Rails ActiveJob / DelayedJob

Consider DelayedJob for projects that are tightly integrated with Rails and fully commit to live in that ecosystem.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/TenjinInc/procrastinator.

This project is intended to be a friendly space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.

Play nice.

Developers

After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Docs are generated using YARD. Run rake yard to generate a local copy.

License

The gem is available as open source under the terms of the MIT License.