Marj - Minimal ActiveRecord Jobs

A minimal database-backed ActiveJob queueing backend.

API docs: https://gemdocs.org/gems/marj/latest
RubyGems: https://rubygems.org/gems/marj
Changelog: https://github.com/nicholasdower/marj/releases
Issues: https://github.com/nicholasdower/marj/issues
Development: https://github.com/nicholasdower/marj/blob/master/CONTRIBUTING.md

Features

  • Enqueued jobs are written to the database.
  • Successfully executed jobs are deleted from the database.
  • Failed jobs which should be retried are updated in the database.
  • Failed jobs which should not be retried are deleted from the database.
  • An interface is provided to retrieve, execute, discard and re-enqueue jobs.
  • An ActiveRecord class is provided to query the database directly.

Features Not Provided

  • Workers
  • Timeouts
  • Concurrency Controls
  • Observability
  • A User Interace

Setup

1. Install

bundle add activejob activerecord marj

# or

gem install activejob activerecord marj

3. Create the database table

class CreateJobs < ActiveRecord::Migration[7.1]
  def self.up
    create_table :jobs, id: :string, primary_key: :job_id do |table|
      table.string   :job_class,            null: false
      table.text     :arguments,            null: false
      table.string   :queue_name,           null: false
      table.integer  :priority
      table.integer  :executions,           null: false
      table.text     :exception_executions, null: false
      table.datetime :enqueued_at,          null: false
      table.datetime :scheduled_at
      table.string   :locale,               null: false
      table.string   :timezone,             null: false
    end

    add_index :jobs, %i[enqueued_at]
    add_index :jobs, %i[scheduled_at]
    add_index :jobs, %i[priority scheduled_at enqueued_at]
  end

  def self.down
    drop_table :jobs
  end
end

Note that by default, Marj uses a table named jobs. To override the default table name, set Marj.table_name before loading ActiveRecord.

4. Configure the queue adapter

require 'marj'

Rails.configuration.active_job.queue_adapter = :marj # Globally, with Rails
ActiveJob::Base.queue_adapter = :marj                # Globally, without Rails
SomeJob.queue_adapter = :marj                        # Single job

Example Usage

# Enqueue and manually run a job:
job = SomeJob.perform_later('foo')
job.perform_now

# Retrieve and execute a job
Marj.due.next.perform_now

# Run all due jobs (single DB query)
Marj.due.perform_all

# Run all due jobs (multiple DB queries)
Marj.due.perform_all(batch_size: 1)

# Run all due jobs in a specific queue:
Marj.queue('foo').due.perform_all

# Run jobs as they become due:
loop do
  Marj.due.perform_all rescue logger.error($!)
ensure
  sleep 5.seconds
end

Jobs Interface

The Marj module provides methods for interacting with enqueued jobs. These methods accept, return and yield +ActiveJob+ objects rather than +ActiveRecord+ objects. Returned jobs are orderd by due date. To query the database directly, use Marj::Record.

Example usage:

Marj.all         # Returns all enqueued jobs.
Marj.queue       # Returns jobs in the specified queue(s).
Marj.due         # Returns jobs which are due to be executed.
Marj.next        # Returns the next job(s) to be executed.
Marj.count       # Returns the number of enqueued jobs.
Marj.where       # Returns jobs matching the specified criteria.
Marj.perform_all # Executes all jobs.
Marj.discard_all # Discards all jobs.
Marj.discard     # Discards the specified job.

Query methods can also be chained:

Marj.due.where(job_class: SomeJob).next # Returns the next SomeJob that is due

Custom Jobs Interface

The Marj::JobsInterface can be added to any class or module. For example, to add it to all jobs classes:

class ApplicationJob < ActiveJob::Base
  extend Marj::JobsInterface

  def self.all
    Marj::Relation.new(
      self == ApplicationJob ?
        Marj::Record.ordered : Marj::Record.where(job_class: self)
   )
  end
end

class SomeJob < ApplicationJob; end

ApplicationJob.due # Returns all jobs which are due to be executed.
SomeJob.due        # Returns SomeJobs which are due to be executed.

Multiple Tables

It is possible to create a custom record class in order to, for instance, write jobs to multiple databases/tables within a single application.

class CreateMyJobs < ActiveRecord::Migration[7.1]
  def self.up
    create_table :my_jobs, id: :string, primary_key: :job_id do |table|
      table.string   :job_class,            null: false
      table.text     :arguments,            null: false
      table.string   :queue_name,           null: false
      table.integer  :priority
      table.integer  :executions,           null: false
      table.text     :exception_executions, null: false
      table.datetime :enqueued_at,          null: false
      table.datetime :scheduled_at
      table.string   :locale,               null: false
      table.string   :timezone,             null: false
    end

    add_index :my_jobs, %i[enqueued_at]
    add_index :my_jobs, %i[scheduled_at]
    add_index :my_jobs, %i[priority scheduled_at enqueued_at]
  end

  def self.down
    drop_table :my_jobs
  end
end

class MyRecord < Marj::Record
  self.table_name = 'my_jobs'
end

CreateMyJobs.migrate(:up)

class MyJob < ActiveJob::Base
  self.queue_adapter = MarjAdapter.new('MyRecord')

  extend Marj::JobsInterface

  def self.all
    Marj::Relation.new(MyRecord.all)
  end

  def perform(msg)
    puts msg
  end
end

MyJob.perform_later('oh, hi')
MyJob.due.next.perform_now

Testing

By default, jobs enqeued during tests will be written to the database. Enqueued jobs can be executed via:

Marj.due.perform_all

Alternatively, to use ActiveJob::QueueAdapters::TestAdapter:

ActiveJob::Base.queue_adapter = :test

Extension Examples

Timeouts

class ApplicationJob < ActiveJob::Base
  def self.timeout_after(duration)
    @timeout = duration
  end

  around_perform do |job, block|
    if (timeout = job.class.instance_variable_get(:@timeout))
      ::Timeout.timeout(timeout, StandardError, 'execution expired') do
        block.call
      end
    else
      block.call
    end
  end
end

Last Error

class AddLastErrorToJobs < ActiveRecord::Migration[7.1]
  def self.up
    add_column :jobs, :last_error, :text
  end

  def self.down
    remove_column :jobs, :last_error
  end
end

class ApplicationJob < ActiveJob::Base
  attr_reader :last_error

  def last_error=(error)
    if error.is_a?(Exception)
      backtrace = error.backtrace&.map { |line| "\t#{line}" }&.join("\n")
      error = backtrace ?
        "#{error.class}: #{error.message}\n#{backtrace}" :
        "#{error.class}: #{error.message}"
    end

    @last_error = error&.truncate(10_000, omission: '… (truncated)')
  end

  def set(options = {})
    super.tap { self.last_error = options[:error] if options[:error] }
  end

  def serialize
    super.merge('last_error' => @last_error)
  end

  def deserialize(job_data)
    super.tap { self.last_error = job_data['last_error'] }
  end
end

ActiveJob Cheatsheet

For more information on ActiveJob, see:

Configuring a Queue Adapter

# With Rails
Rails.configuration.active_job.queue_adapter = :foo # Instantiates FooAdapter
Rails.configuration.active_job.queue_adapter = FooAdapter.new

# Without Rails
ActiveJob::Base.queue_adapter = :foo               # Instantiates FooAdapter
ActiveJob::Base.queue_adapter = FooAdapter.new     # Uses FooAdapter directly

# Single Job
SomeJob.queue_adapter = :foo                       # Instantiates FooAdapter
SomeJob.queue_adapter = FooAdapter.new             # Uses FooAdapter directly

Configuration

config.active_job.default_queue_name
config.active_job.queue_name_prefix
config.active_job.queue_name_delimiter
config.active_job.retry_jitter
SomeJob.queue_name
SomeJob.queue_as
SomeJob.queue_name_prefix
SomeJob.queue_name_delimiter
SomeJob.retry_jitter

Options

:wait       # Enqueues the job with the specified delay
:wait_until # Enqueues the job at the time specified
:queue      # Enqueues the job on the specified queue
:priority   # Enqueues the job with the specified priority

Callbacks

SomeJob.before_enqueue
SomeJob.after_enqueue
SomeJob.around_enqueue
SomeJob.before_perform
SomeJob.after_perform
SomeJob.around_perform
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :before, &block)
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :after, &block)
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, &block)

Handling Exceptions

SomeJob.retry_on
SomeJob.discard_on
SomeJob.after_discard

Creating Jobs

# Create without enqueueing
job = SomeJob.new
job = SomeJob.new(args)
job = SomeJob.new.deserialize(other_job.serialize)

# Create and enqueue
job = SomeJob.perform_later
job = SomeJob.perform_later(args)

# Create without enqueueing and run (only enqueued on failure if retryable)
SomeJob.perform_now
SomeJob.perform_now(args)

Enqueueing Jobs

Jobs are enqueued via the ActiveJob::Base#enqueue method. This method returns the job on success. If an error is raised during enqueueing, that error will propagate to the caller, unless the error is an ActiveJob::EnqueueError. In this case, enqueue will return false and job.enqueue_error will be set.

SomeJob.new(args).enqueue
SomeJob.new(args).enqueue(options)

# Via perform_later
SomeJob.perform_later(SomeJob.new(args))
SomeJob.perform_later(args)
SomeJob.set(options).perform_later(args)

# After a failure during execution
SomeJob.perform_now(args)
ActiveJob::Base.execute(SomeJob.new(args).serialize)

# Enqueue multiple
ActiveJob.perform_all_later(SomeJob.new, SomeJob.new)
ActiveJob.perform_all_later(SomeJob.new, SomeJob.new, options:)
SomeJob.set(options).perform_all_later(SomeJob.new, SomeJob.new)
SomeJob.set(options).perform_all_later(SomeJob.new, SomeJob.new, options:)

Executing Jobs

# Executed without enqueueing, enqueued on failure if retryable
SomeJob.new(args).perform_now
SomeJob.perform_now(args)
ActiveJob::Base.execute(SomeJob.new(args).serialize)

# Executed after enqueueing
SomeJob.perform_later(args).perform_now
ActiveJob::Base.execute(SomeJob.perform_later(args).serialize)