Beaneater

Build Status Coverage Status

Beaneater is the best way to interact with beanstalkd from within Ruby. Beanstalkd is a simple, fast work queue. Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously. Read the yardocs and/or the beanstalk protocol for more details.

Why Beanstalk?

Illya has an excellent blog post Scalable Work Queues with Beanstalk and Adam Wiggins posted an excellent comparison.

You will find that beanstalkd is an underrated but incredibly powerful project that is extremely well-suited as a job or messaging queue. Significantly better suited for this task than Redis or a traditional RDBMS. Beanstalk is a simple, and fast work queue service rolled into a single binary - it is the memcached of work queues. Originally built to power the backend for the 'Causes' Facebook app, it is a mature and production ready open source project. PostRank has used beanstalk to reliably process millions of jobs a day.

A single instance of Beanstalk is perfectly capable of handling thousands of jobs a second (or more, depending on your job size) because it is an in-memory, event-driven system. Powered by libevent under the hood, it requires zero setup (launch and forget, à la memcached), optional log based persistence, an easily parsed ASCII protocol, and a rich set of tools for job management that go well beyond a simple FIFO work queue.

Beanstalkd supports the following features out of the box:

Feature Description
Easy Setup Quick to install, no files to edit, no settings to tweak.
Speed Process thousands of jobs per second without breaking a sweat.
Client Support Client libraries exist for over 21 languages including Python, Ruby, and Go.
Tubes Supports multiple work queues created automatically on demand.
Reliable Beanstalk’s reserve, work, delete cycle ensures reliable processing.
Scheduling Delay enqueuing jobs by a specified interval to be processed later.
Priorities Important jobs go to the head of the queue and get processed sooner.
Persistence Jobs are stored in memory for speed, but logged to disk for safe keeping.
Scalability Client-side federation provides effortless horizontal scalability.
Error Handling Bury any job which causes an error for later debugging and inspection.
Simple Debugging Talk directly to the beanstalkd server over telnet to get a handle on your app.
Efficiency Each beanstalkd process can handle tens of thousands of open connections.
Memory Usage Use the built-in ulimit OS feature to cap beanstalkd's memory consumption.

Keep in mind that these features are supported out of the box with beanstalk and requires no special ruby specific logic. In the end, beanstalk is the ideal job queue and has the added benefit of being easy to setup and configure.

Installation

Install beanstalkd:

Mac OS

brew install beanstalkd
beanstalkd -p 11300

Ubuntu

apt-get install beanstalkd
beanstalkd -p 11300

Install beaneater as a gem:

gem install beaneater

or add this to your Gemfile:

# Gemfile
gem 'beaneater'

and run bundle install to install the dependency.

Quick Overview:

The concise summary of how to use beaneater:

# Connect to pool
@beanstalk = Beaneater::Pool.new(['localhost:11300'])
# Enqueue jobs to tube
@tube = @beanstalk.tubes["my-tube"]
@tube.put '{ "key" : "foo" }', :pri => 5
@tube.put '{ "key" : "bar" }', :delay => 3
# Process jobs from tube
while @tube.peek(:ready)
  job = @tube.reserve
  puts "job value is #{JSON.parse(job.body)["key"]}!"
  job.delete
end
# Disconnect the pool
@beanstalk.close

For a more detailed rundown, check out the Usage section below.

Usage

Configuration

To setup advanced options for beaneater, you can pass configuration options using:

Beaneater.configure do |config|
  # config.default_put_delay   = 0
  # config.default_put_pri     = 65536
  # config.default_put_ttr     = 120
  # config.job_parser          = lambda { |body| body }
  # config.beanstalkd_url      = ['localhost:11300']
end

The above options are all defaults, so only include a configuration block if you need to make changes.

Connection

To interact with a beanstalk queue, first establish a connection by providing a set of addresses:

@beanstalk = Beaneater::Pool.new(['10.0.1.5:11300'])

# Or if ENV['BEANSTALKD_URL] == 'localhost:11300,127.0.0.1:11300'
@beanstalk = Beaneater::Pool.new
@beanstalk.connections.first # => localhost:11300
@beanstalk.connections.last # => 127.0.0.1:11300

You can conversely close and dispose of a pool at any time with:

@beanstalk.close

Tubes

Beanstalkd has one or more tubes which can contain any number of jobs. Jobs can be inserted (put) into the used tube and pulled out (reserved) from watched tubes. Each tube consists of a ready, delayed, and buried queue for jobs.

When a client connects, its watch list is initially just the tube named default. Tube names are at most 200 bytes. It specifies the tube to use. If the tube does not exist, it will be automatically created.

To interact with a tube, first find the tube:

@tube = @beanstalk.tubes.find "some-tube-here"
# => <Tube name='some-tube-here'>

To reserve jobs from beanstalk, you will need to 'watch' certain tubes:

# Watch only the tubes listed below (!)
@beanstalk.tubes.watch!('some-tube')
# Append tubes to existing set of watched tubes
@beanstalk.tubes.watch('another-tube')
# You can also ignore tubes that have been watched previously
@beanstalk.tubes.ignore('some-tube')

You can easily get a list of all, used or watched tubes:

# The list-tubes command returns a list of all existing tubes
@beanstalk.tubes.all
# => [<Tube name='foo'>, <Tube name='bar'>]

# Returns the tube currently being used by the client (for insertion)
@beanstalk.tubes.used
# => <Tube name='bar'>

# Returns a list tubes currently being watched by the client (for consumption)
@beanstalk.tubes.watched
# => [<Tube name='foo'>]

You can also temporarily 'pause' the execution of a tube by specifying the time:

tube = @beanstalk.tubes["some-tube-here"]
tube.pause(3) # pauses tube for 3 seconds

or even clear the tube of all jobs:

tube = @beanstalk.tubes["some-tube-here"]
tube.clear # tube will now be empty

In summary, each beanstalk client manages two separate concerns: which tube newly created jobs are put into, and which tube(s) jobs are reserved from. Accordingly, there are two separate sets of functions for these concerns:

  • use and using affect where 'put' places jobs
  • watch and watching control where reserve takes jobs from

Note that these concerns are fully orthogonal: for example, when you 'use' a tube, it is not automatically 'watched'. Neither does 'watching' a tube affect the tube you are 'using'.

Jobs

A job in beanstalk gets inserted by a client and includes the 'body' and job metadata. Each job is enqueued into a tube and later reserved and processed. Here is a picture of the typical job lifecycle:

   put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

A job at any given time is in one of three states: ready, delayed, or buried:

State Description
ready waiting to be reserved and processed after being put onto a tube.
delayed waiting to become ready after the specified delay.
buried waiting to be kicked, usually after job fails to process

In addition, there are several actions that can be performed on a given job, you can:

  • reserve which locks a job from the ready queue for processing.
  • touch which extends the time before a job is autoreleased back to ready.
  • release which places a reserved job back onto the ready queue.
  • delete which removes a job from beanstalk.
  • bury which places a reserved job into the buried state.
  • kick which places a buried job from the buried queue back to ready.

You can insert a job onto a beanstalk tube using the put command:

@tube.put "job-data-here"

Beanstalkd can only stores strings as job bodies, but you can easily encode your data into a string:

@tube.put({:foo => 'bar'}.to_json)

Each job has various metadata associated such as priority, delay, and ttr which can be specified as part of the put command:

# defaults are priority 0, delay of 0 and ttr of 120 seconds
@tube.put "job-data-here", :pri => 1000, :delay => 50, :ttr => 200

The priority argument is an integer < 2**32. Jobs with a smaller priority take precedence over jobs with larger priorities. The delay argument is an integer number of seconds to wait before putting the job in the ready queue. The ttr argument is the time to run -- is an integer number of seconds to allow a worker to run this job.

Processing Jobs (Manually)

In order to process jobs, the client should first specify the intended tubes to be watched. If not specified, this will default to watching just the default tube.

@beanstalk = Beaneater::Pool.new(['10.0.1.5:11300'])
@beanstalk.tubes.watch!('tube-name', 'other-tube')

Next you can use the reserve command which will return the first available job within the watched tubes:

job = @beanstalk.tubes.reserve
# => <Beaneater::Job id=5 body="foo">
puts job.body
# prints 'job-data-here'
print job.stats.state # => 'reserved'

By default, reserve will wait indefinitely for the next job. If you want to specify a timeout, simply pass that in seconds into the command:

job = @beanstalk.tubes.reserve(5) # wait 5 secs for a job, then return
# => <Beaneater::Job id=5 body="foo">

You can 'release' a reserved job back onto the ready queue to retry later:

job = @beanstalk.tubes.reserve
# ...job has ephemeral fail...
job.release :delay => 5
print job.stats.state # => 'delayed'

You can also 'delete' jobs that are finished:

job = @beanstalk.tubes.reserve
job.touch # extends ttr for job
# ...process job...
job.delete

Beanstalk jobs can also be buried if they fail, rather than being deleted:

job = @beanstalk.tubes.reserve
# ...job fails...
job.bury
print job.stats.state # => 'buried'

Burying a job means that the job is pulled out of the queue into a special 'holding' area for later inspection or reuse. To reanimate this job later, you can 'kick' buried jobs back into being ready:

@beanstalk.tubes['some-tube'].kick(3)

This kicks 3 buried jobs for 'some-tube' back into the 'ready' state. Jobs can also be inspected using the 'peek' commands. To find and peek at a particular job based on the id:

@beanstalk.jobs.find(123)
# => <Beaneater::Job id=123 body="foo">

You can also find_all jobs across all connections:

@beanstalk.jobs.find_all(123)
# => [<Beaneater::Job id=123 body="foo">, <Beaneater::Job id=123 body="bar">]

or you can peek at jobs within a tube:

@tube = @beanstalk.tubes.find('foo')
@tube.peek(:ready)
# => <Beaneater::Job id=123 body="ready">
@tube.peek(:buried)
# => <Beaneater::Job id=456 body="buried">
@tube.peek(:delayed)
# => <Beaneater::Job id=789 body="delayed">

When dealing with jobs there are a few other useful commands available:

job = @beanstalk.tubes.reserve
print job.tube      # => "some-tube-name"
print job.reserved? # => true
print job.exists?   # => true
job.delete
print job.exists?   # => false

Processing Jobs (Automatically)

Instead of using watch and reserve, you can also use the higher level register and process methods to process jobs. First you can 'register' how to handle jobs from various tubes:

@beanstalk.jobs.register('some-tube', :retry_on => [SomeError]) do |job|
  do_something(job)
end

@beanstalk.jobs.register('other-tube') do |job|
  do_something_else(job)
end

Once you have registered the handlers for known tubes, calling process! will begin a loop processing jobs as defined by the registered processor blocks:

@beanstalk.jobs.process!

Processing runs the following steps:

  1. Watch all registered tubes
  2. Reserve the next job
  3. Once job is reserved, invoke the registered handler based on the tube name
  4. If no exceptions occur, delete the job (success)
  5. If 'retry_on' exceptions occur, call 'release' (retry)
  6. If other exception occurs, call 'bury' (error)
  7. Repeat steps 2-5

The process! command is ideally suited for a beanstalk job processing daemon. Even though process! is intended to be a long-running process, you can stop the loop at any time by raising AbortProcessingError while processing is running.

Handling Errors

While using Beaneater, certain errors may be encountered. Errors are encountered when a command is sent to beanstalk and something unexpected happens. The most common errors are listed below:

Errors Description
Beaneater::NotConnected Client connection to beanstalk cannot be established.
Beaneater::InvalidTubeName Specified tube name for use or watch is not valid.
Beaneater::NotFoundError Specified job or tube could not be found.
Beaneater::TimedOutError Job could not be reserved within time specified.
Beaneater::JobNotReserved Job has not been reserved and action cannot be taken.

There are other exceptions that are less common such as OutOfMemoryError, DrainingError, DeadlineSoonError, InternalError, BadFormatError, UnknownCommandError, ExpectedCRLFError, JobTooBigError, NotIgnoredError. Be sure to check the beanstalk protocol for more information.

Stats

Beanstalk has plenty of commands for introspecting the state of the queues and jobs. To get stats for beanstalk overall:

# Get overall stats about the job processing that has occurred
print @beanstalk.stats
# => { 'current_connections': 1, 'current_jobs_buried': 0, ... }
print @beanstalk.stats.current_connections
# => 1

For stats on a particular tube:

# Get statistical information about the specified tube if it exists
print @beanstalk.tubes['some_tube_name'].stats
# => { 'current_jobs_ready': 0, 'current_jobs_reserved': 0, ... }

For stats on an individual job:

# Get statistical information about the specified job if it exists
print @beanstalk.jobs[some_job_id].stats
# => {'age': 0, 'id': 2, 'state': 'reserved', 'tube': 'default', ... }

Be sure to check the beanstalk protocol for more details about the stats commands.

Resources

There are other resources helpful when learning about beanstalk:

Contributors