PerfectQueue

Highly available distributed queue.

It provides exactly-once semantics unless backend database fails. Pushed tasks are surely retried by another worker node even if a worker fails. And it never delivers finished/canceled tasks.

Backend database is pluggable. You can use any databases that supports CAS (compare-and-swap) operation. PerfectQueue supports RDBMS and Amazon SimpleDB for now.

Architecture

PerfectQueue uses following database schema:

(
  id:string        -- unique identifier of the task
  data:blob        -- additional attributes of the task
  created_at:int   -- unix time when the task is created (or null for canceled tasks)
  timeout:int
)
  1. list: lists tasks whose timeout column is old enough.

  2. lock: updates timeout column of the first task

  3. run: executes a command

    • if the task takes long time, updates the timeout column. this is repeated until the task is finished

    • if the task takes more long time, kills the process

  4. remove: if it succeeded, removes the row from the backend database

  5. or leave: if it failed, leave the row and expect to be retried

Usage

Submitting a task

Using command line:

# RDBMS
$ perfectqueue \
      --database mysql://user:password@localhost/mydb \
      --table perfectqueue \
      --push unique-key-id '{"any":"data"}'

# SimpleDB
$ perfectqueue \
      --simpledb your-simpledb-domain-name \
      -k AWS_KEY_ID \
      -s AWS_SECRET_KEY \
      --push unique-key-id '{"any":"data"}'

Using PerfectQueue library:

require 'perfectqueue'

# RDBMS
require 'perfectqueue/backend/rdb'
queue = PerfectQueue::RDBBackend.new(
       'mysql://user:password@localhost/mydb', table='perfectqueue')

# SimpleDB
require 'perfectqueue/backend/simpledb'
queue = PerfectQueue::SimpleDBBackend.new(
       'AWS_KEY_ID', 'AWS_SECRET_KEY', 'your-simpledb-domain-name')

queue.submit('unique-key-id', '{"any":"data"}')

Alternatively, you can insert a row into the backend database directly.

RDBMS:

> CREATE TABLE IF NOT EXISTS perfectqueue (
      id VARCHAR(256) NOT NULL,
      timeout INT NOT NULL,
      data BLOB NOT NULL,
      created_at INT,
      PRIMARY KEY (id)
    );
> SET @now = UNIX_TIMESTAMP();
> INSERT INTO perfectqueue (id, timeout, data, created_at)
    VALUES ('unique-task-id', @now, '{"any":"data"}', @now);

SimpleDB:

require 'aws'  # gem install aws-sdk
queue = AWS::SimpleDB.new
domain = queue.domains['your-simpledb-domain-name']

now = "%08x" % Time.now.to_i
domain.items['unique-task-id'].attributes.replace(
    'timeout'=>now, 'data'=>'{"any":"data"}', 'created_at'=>now,
    :unless=>'timeout')

Canceling a queued task

Using command line:

$ perfectqueue ... --cancel unique-key-id

Using PerfectQueue library:

queue.cancel('unique-key-id')

Alternatively, you can delete a row from the backend database directly.

RDBMS:

> DELETE FROM perfectqueue WHERE id='unique-key-id';

SimpleDB:

domain.items['unique-task-id'].delete

Running worker node

Use perfectqueue command to execute a command.

Usage: perfectqueue [options] [-- <ARGV-for-exec-or-run>]

        --push <ID> <DATA>           Push a task to the queue
        --list                       Show queued tasks
        --cancel <ID>                Cancel a queued task
        --configure <PATH.yaml>      Write configuration file

        --exec <COMMAND>             Execute command
        --run <SCRIPT.rb>            Run method named 'run' defined in the script

    -f, --file <PATH.yaml>           Read configuration file
    -C, --run-class                  Class name for --run (default: ::Run)
    -t, --timeout <SEC>              Time for another worker to take over a task when this worker goes down (default: 600)
    -b, --heartbeat-interval <SEC>   Threshold time to extend the timeout (heartbeat interval) (default: timeout * 3/4)
    -x, --kill-timeout <SEC>         Threshold time to kill a task process (default: timeout * 10)
    -X, --kill-interval <SEC>        Threshold time to retry killing a task process (default: 60)
    -i, --poll-interval <SEC>        Polling interval (default: 1)
    -r, --retry-wait <SEC>           Time to retry a task when it is failed (default: same as timeout)
    -e, --expire <SEC>               Threshold time to expire a task (default: 345601 (4days))

        --database <URI>             Use RDBMS for the backend database (e.g.: mysql://user:password@localhost/mydb)
        --table <NAME>               backend: name of the table (default: perfectqueue)
        --simpledb <DOMAIN>          Use Amazon SimpleDB for the backend database (e.g.: --simpledb mydomain -k KEY_ID -s SEC_KEY)
    -k, --key-id <ID>                AWS Access Key ID
    -s, --secret-key <KEY>           AWS Secret Access Key

    -w, --worker <NUM>               Number of worker threads (default: 1)
    -d, --daemon <PIDFILE>           Daemonize (default: foreground)
        --env K=V                    Set environment variable
    -o, --log <PATH>                 log file path
    -v, --verbose                    verbose mode

–exec

Execute a command when a task is received. The the data column is passed to the stdin and the id column passed to the last argument. The command have to exit with status code 0 when it succeeded.

Example:

#!/usr/bin/env ruby

require 'json'
js = JSON.load(STDIN.read)
puts "received: id=#{ARGV.last} #{js.inspect}"

#$ perfectqueue --database sqlite://test.db \
   --exec ./this_file -- my cmd args

When the kill timeout (-x, –kill-timeout) is elapsed, SIGTERM signal will be sent to the child process. The signal will be repeated every few seconds (-X, –kill-interval).

–run

This is same as ‘exec’ except that it creates a instance of a class named ‘Run’ defined in the file. The class should has ‘initialize(task)’, ‘run’ and ‘kill’ methods. You can get data column and id column of the task from the argument of the initialize method. It is assumed it succeeded if the method doesn’t raise any errors.

Example:

require 'json'

class Run
  def initialize(task)
    @task = task
  end

  def run
    js = JSON.load(@task.data)
    puts "received: id=#{@task.id} #{js.inspect}"
  end

  def kill
    puts "kill!"
  end
end

#$ perfectqueue --database sqlite://test.db \
   --run ./this_file.rb -- my cmd args

When the kill timeout (-x, –klill-timeout) is elapsed, Run#kill method will be called (if it is defined). It will be repeated every few seconds (-X, –kill-retry).

–configure

Write configuration file and exit. Written configuration file can be used with -f option:

Example:

## create myqueue.yaml file
$ perfectqueue --database mysql://root:my@localhost/mydb \
  --run myrun.rb -- my cmd args \
  --configure myqueue.yaml

## run perfectqueue using the configuration file
$ perfectqueue -f myqueue.yaml

–list

Show queued tasks.

Example:

$ perfectqueue --database sqlite://test.db --list
                        id                 created_at                    timeout  data
                     task1  2011-08-23 23:07:45 +0900  2011-08-23 23:07:45 +0900  {"attr1":"val1","attr":"val2"}
1 entries.