RabbitWQ

A work queue built on RabbitMQ and Celluloid.

Installation

Add this line to your application's Gemfile:

gem 'rabbit-wq'

And then execute:

$ bundle

Or install it yourself as:

$ gem install rabbit-wq

Usage

Queue Subscriber

Starting the Work Queue Subscriber in Interactive Mode

$ rabbit-wq start --interactive

Starting the Work Queue Subscriber as a Daemon

$ rabbit-wq start

Stopping the Work Queue Subscriber

$ rabbit-wq stop

Restarting the Work Queue Subscriber

$ rabbit-wq restart

Checking the Status of the Work Queue Subscriber

$ rabbit-wq status

Queueing Work

Implementing as a PORO

class SomeWorker < Struct.new( :some_variable )
  def call
    # do some work
  end
end

worker = SomeWorker.new( 1 )
RabbitWQ::Work.enqueue( worker )

Work in the Future

RabbitWQ::Work.enqueue( worker, delay: 30000 )

Queueing Work with a Retry

RabbitWQ::Work.enqueue( worker, retry: 1 )

Queueing Work with a Retry and a Retry Delay

RabbitWQ::Work.enqueue( worker, retry: 1, retry_delay: 30000 )

Queueing Work with a Retry and Auto-Scaling Retry Dealy

RabbitWQ::Work.enqueue( worker, retry: 1, retry_delay: 'auto-scale' )

Auto-scale will set up retries at the following intervals: 1 min, 5 mins, 15 mins, 30 mins, 1 hr, 6 hrs, 12 hrs, 24 hrs. and 48 hrs.

Implementing with the Worker Module

class SomeWorker < Struct.new( :some_variable )
  include RabbitWQ::Worker

  def call
    # do some work
  end
end

worker = SomeWorker.new( 1 )
worker.work # same as RabbitWQ::Work.enqueue( worker )

Success Callback

on_success

Called on success when defined on a worker.

Error Handling

Once a worker has raised an exception and no retry attempts are remaining, the worker is placed on the error queue with the exception type, message and backtraces.

Error Callbacks

There are several error callbacks that will be called if defined on a worker. Each error callback will receive a single parameter, the error.

on_error

Called anytime an error is raised, including if a retry will be attempted.

on_final_error

Called when an error is raised and either no retries were requested or are remaining.

on_retryable_error

Called when an error is raised and a retry will be attempted.

Disabling a Worker

A worker can be disabled by overriding the #enabled? method.

class SomeWorker
  include RabbitWQ::Worker

  def enabled?
    # some logic to determine enabled
  end
end

By default, when a worker is disabled a log entry is created and the worker does no work and leaves hte work queue system. In order to have the worker sent to the error queue if disabled, simply override the #error_on_disabled? method:

class SomeWorker
  include RabbitWQ::Worker

  def error_on_disabled?
    true
  end
end

Logging

RabbitWQ provides a work logger that is available within all workers. You must send a reference to self so that the #object_id may be put into the log message.

RabbitWQ.work_logger.info( self, 'Some message' )

When the RabbitWQ::Worker module is mixed into a worker you can use the logging convenience methods. You do not have to provide a refrence to self in this case.

class SomeWorker < Struct.new( :some_variable )
  include RabbitWQ::Worker

  def call
    info( 'Some message' )
    # do some work
  end
end

The RabbitWQ loggers provide the following log levels: debug, info, warn, error and fatal.

Work Publish vs. Subscribe Queue

Quite often the work publish and subscribe queues are the same queue. However, certain use cases require a seprarate work publish and subscribe queue. For instance, when you use te RabbitMQ shovel plugin to effectively create a distributed queue, you may want to publish to a local queue that is shoveled to a central work queue, where the subscriber resides and performs the actual work.

Configuration File

The RabbitWQ configuration file uses JSON syntax. The default location for the configuration file is /var/run/rabbit-wq/rabbit-wq.conf

Here is an example configuration file with each option's default value:

{
  "delayed_exchange_prefix": "work-delay",
  "delayed_queue_prefix": "work-delay",
  "environment_file_path": nil,
  "env": "production",
  "error_queue": "work-error",
  "threads": 1,
  "time_zone": "UTC",
  "work_exchange": "work",
  "work_exchange_type": "fanout",
  "work_log_level": "info",
  "work_log_path": "/var/log/rabbit-wq/rabbit-wq-work.log",
  "work_publish_queue": "work"
  "work_subscribe_queue": "work"
}

Options

delayed_exchange_prefix

The prefix for the delayed exchanges. Defaults to work-delay.

delayed_queue_prefix

The prefix for the delayed queues. Defaults to work-delay.

environment_file_path

The path to the environment file (loads all or some of application environment). No default.

env

The environment to run in. Defaults to production.

error_queue

The name of the error queue. Defaults to error.

threads

The size of the thread pool. Can be overridden with the command line option --threads or -t. Defaults to 1.

time_zone

The time zone to use with ActiveRecord.

work_exchange

The name of the work exchange. Defaults to work.

work_exchange_type

The RabbitMQ exchange type of the work exchange. Defaults to fanout. For more information see RabbitMQ Docs.

work_log_level

The log level of the worker logger. Defaults to info.

work_log_path

The path the worker logger will log to. Defaults to /var/log/rabbit-wq/rabbit-wq-work.log.

work_publish_queue

The name of the work queue to publish to. Defaults to work.

work_subscribe_queue

The name of the work queue to subscribe to. Defaults to work.

Command Line Interface

Commands

start

Starts the subscriber.

stop

Stops the subscriber.

restart

Restarts the subscriber.

status

Reports the status of the subscriber, started or stopped.

Options

config (--config or -c)

The path for the configuration file. Defaults to /etc/rabbit-wq/rabbit-wq.conf.

log_level (--log_level)

The log level for the work subscriber's logger. This does not have an effect on the worker logger. Defaults to info.

log (--log or -l)

The path for the work subsciber's log file. Defaults to /var/log/rabbit-wq/rabbit-wq.log.

pid (--pid)

The path to the PID file. Defaults to /var/run/rabbit-wq/rabbit-wq.pid.

interactive (--interactive or -i)

When used the work subscriber is executed in interactive (attached) mode as opposed to as a daemon.