PerfectQueue
Highly available distributed queue.
It provides exactly-once semantics unless backend database fails. Pushed tasks are surely retried by another worker node even if a worker fails. And it never delivers finished/canceled tasks.
Backend database is pluggable. You can use any databases that supports CAS (compare-and-swap) operation. PerfectQueue supports RDBMS and Amazon SimpleDB for now.
Architecture
PerfectQueue uses following database schema:
(
id:string -- unique identifier of the task
data:blob -- additional attributes of the task
created_at:int -- unix time when the task is created (or null for canceled tasks)
timeout:int
)
-
list: lists tasks whose timeout column is old enough.
-
lock: updates timeout column of the first task
-
run: executes a command
-
if the task takes long time, updates the timeout column. this is repeated until the task is finished
-
if the task takes more long time, kills the process
-
-
remove: if it succeeded, removes the row from the backend database
-
or leave: if it failed, leave the row and expect to be retried
Usage
Submitting a task
Using command line:
# RDBMS
$ perfectqueue \
--database mysql://user:password@localhost/mydb \
--table perfectqueue \
--push unique-key-id '{"any":"data"}'
# SimpleDB
$ perfectqueue \
--simpledb your-simpledb-domain-name \
-k AWS_KEY_ID \
-s AWS_SECRET_KEY \
--push unique-key-id '{"any":"data"}'
Using PerfectQueue library:
require 'perfectqueue'
# RDBMS
require 'perfectqueue/backend/rdb'
queue = PerfectQueue::RDBBackend.new(
'mysql://user:password@localhost/mydb', table='perfectqueue')
# SimpleDB
require 'perfectqueue/backend/simpledb'
queue = PerfectQueue::SimpleDBBackend.new(
'AWS_KEY_ID', 'AWS_SECRET_KEY', 'your-simpledb-domain-name')
queue.submit('unique-key-id', '{"any":"data"}')
Alternatively, you can insert a row into the backend database directly.
RDBMS:
> CREATE TABLE IF NOT EXISTS perfectqueue (
id VARCHAR(256) NOT NULL,
timeout INT NOT NULL,
data BLOB NOT NULL,
created_at INT,
PRIMARY KEY (id)
);
> SET @now = UNIX_TIMESTAMP();
> INSERT INTO perfectqueue (id, timeout, data, created_at)
VALUES ('unique-task-id', @now, '{"any":"data"}', @now);
SimpleDB:
require 'aws' # gem install aws-sdk
queue = AWS::SimpleDB.new
domain = queue.domains['your-simpledb-domain-name']
now = "%08x" % Time.now.to_i
domain.items['unique-task-id'].attributes.replace(
'timeout'=>now, 'data'=>'{"any":"data"}', 'created_at'=>now,
:unless=>'timeout')
Canceling a queued task
Using command line:
$ perfectqueue ... --cancel unique-key-id
Using PerfectQueue library:
queue.cancel('unique-key-id')
Alternatively, you can delete a row from the backend database directly.
RDBMS:
> DELETE FROM perfectqueue WHERE id='unique-key-id';
SimpleDB:
domain.items['unique-task-id'].delete
Running worker node
Use perfectqueue command to execute a command.
Usage: perfectqueue [options] [-- <ARGV-for-exec-or-run>]
--push <ID> <DATA> Push a task to the queue
--list Show queued tasks
--cancel <ID> Cancel a queued task
--configure <PATH.yaml> Write configuration file
--exec <COMMAND> Execute command
--run <SCRIPT.rb> Run method named 'run' defined in the script
-f, --file <PATH.yaml> Read configuration file
-C, --run-class Class name for --run (default: ::Run)
-t, --timeout <SEC> Time for another worker to take over a task when this worker goes down (default: 600)
-b, --heartbeat-interval <SEC> Threshold time to extend the timeout (heartbeat interval) (default: timeout * 3/4)
-x, --kill-timeout <SEC> Threshold time to kill a task process (default: timeout * 10)
-X, --kill-interval <SEC> Threshold time to retry killing a task process (default: 60)
-i, --poll-interval <SEC> Polling interval (default: 1)
-r, --retry-wait <SEC> Time to retry a task when it is failed (default: same as timeout)
-e, --expire <SEC> Threshold time to expire a task (default: 345601 (4days))
--database <URI> Use RDBMS for the backend database (e.g.: mysql://user:password@localhost/mydb)
--table <NAME> backend: name of the table (default: perfectqueue)
--simpledb <DOMAIN> Use Amazon SimpleDB for the backend database (e.g.: --simpledb mydomain -k KEY_ID -s SEC_KEY)
-k, --key-id <ID> AWS Access Key ID
-s, --secret-key <KEY> AWS Secret Access Key
-w, --worker <NUM> Number of worker threads (default: 1)
-d, --daemon <PIDFILE> Daemonize (default: foreground)
--env K=V Set environment variable
-o, --log <PATH> log file path
-v, --verbose verbose mode
–exec
Execute a command when a task is received. The the data column is passed to the stdin and the id column passed to the last argument. The command have to exit with status code 0 when it succeeded.
Example:
#!/usr/bin/env ruby
require 'json'
js = JSON.load(STDIN.read)
puts "received: id=#{ARGV.last} #{js.inspect}"
#$ perfectqueue --database sqlite://test.db \
--exec ./this_file -- my cmd args
When the kill timeout (-x, –kill-timeout) is elapsed, SIGTERM signal will be sent to the child process. The signal will be repeated every few seconds (-X, –kill-interval).
–run
This is same as ‘exec’ except that it creates a instance of a class named ‘Run’ defined in the file. The class should has ‘initialize(task)’, ‘run’ and ‘kill’ methods. You can get data column and id column of the task from the argument of the initialize method. It is assumed it succeeded if the method doesn’t raise any errors.
Example:
require 'json'
class Run
def initialize(task)
@task = task
end
def run
js = JSON.load(@task.data)
puts "received: id=#{@task.id} #{js.inspect}"
end
def kill
puts "kill!"
end
end
#$ perfectqueue --database sqlite://test.db \
--run ./this_file.rb -- my cmd args
When the kill timeout (-x, –klill-timeout) is elapsed, Run#kill method will be called (if it is defined). It will be repeated every few seconds (-X, –kill-retry).
–configure
Write configuration file and exit. Written configuration file can be used with -f option:
Example:
## create myqueue.yaml file
$ perfectqueue --database mysql://root:my@localhost/mydb \
--run myrun.rb -- my cmd args \
--configure myqueue.yaml
## run perfectqueue using the configuration file
$ perfectqueue -f myqueue.yaml
–list
Show queued tasks.
Example:
$ perfectqueue --database sqlite://test.db --list
id created_at timeout data
task1 2011-08-23 23:07:45 +0900 2011-08-23 23:07:45 +0900 {"attr1":"val1","attr":"val2"}
1 entries.