Quebert
async_observer is great, but is dated and doesn't really support running jobs outside of the async_send idiom. Quebert is an attempt to mix how jobs are run in other popular worker queue frameworks, like resque and dj, with async_observer so that you can have it both ways.
Why Quebert (or how is it different from DJ and Resque)?
Because it has really low latency. Other Ruby queuing frameworks, like DJ or Resque, have to poll their queue servers periodicly. You could think of it as a "pull" queue. Quebert is a "push" queue. It maintains a persistent connection with beanstalkd and when is enqueud, its instantly pushed to the workers and executed.
Who uses it?
Quebert is a serious project. Its used in a production environment at Poll Everywhere to handle everything from SMS message processing to account downgrades.
Features
- Multiple back-ends (InProcess, Sync, and Beanstalk)
- Rails/ActiveRecord integration similar to async_observer
- Pluggable exception handling (for Hoptoad integration)
- Run workers with pid, log, and config files. These do not daemonize (do it yourself punk!)
- Provide custom hooks to be called before, after & around jobs are run
Some features that are currently missing that I will soon add include:
- Rails plugin support (The AR integrations have to be done manually today)
- Auto-detecting serializers. Enhanced ClassRegistry to more efficiently look up serializers for objects.
How to use
There are two ways to enqueue jobs with Quebert: through the Job itself, provided you set a default back-end for the job, or put it on the backend.
Jobs
Quebert includes a Job class so you can implement how you want certain types of Jobs performed.
Quebert.backend = Quebert::Backend::InProcess.new
class WackyMathWizard < Quebert::Job
def perform(*nums)
nums.inject(0){|sum, n| sum = sum + n}
end
end
You can either drop a job in a queue:
Quebert.backend.put WackyMathWizard.new(1, 2, 3)
Or drop it in right from the job:
WackyMathWizard.new(4, 5, 6).enqueue
Then perform the jobs!
Quebert.backend.reserve.perform # => 6
Quebert.backend.reserve.perform # => 15
Rails integration
config/quebert.yml:
development:
backend: beanstalk
host: localhost:11300
queue: myapp-development
test:
backend: sync
# etc.
config/initializers/quebert.rb:
Quebert.config.from_hash(Rails.application.config.quebert)
Quebert.config.logger = Rails.logger
Before/After/Around Hooks
Quebert has support for providing custom hooks to be called before, after & around your jobs are being run. A common example is making sure that any active ActiveRecord database connections are put back on the connection pool after a job is done:
Quebert.config.after_job do
ActiveRecord::Base.clear_active_connections!
end
Quebert.config.before_job do |job|
# all hooks take an optional job argument
# in case you want to do something with that job
end
Quebert.config.around_job do |job|
# this hook gets called twice
# once before & once after a job is performed
end
Async Sender
Take any ol' class and include the Quebert::AsyncSender.
Quebert.backend = Quebert::Backend::InProcess.new
class Greeter
include Quebert::AsyncSender::Class
def initialize(name)
@name = name
end
def sleep_and_greet(time_of_day)
sleep 10000 # Sleeping, get it?
"Oh! Hi #{name}! Good #{time_of_day}."
end
def self.budweiser_greeting(name)
"waaazup #{name}!"
end
end
walmart_greeter = Greeter.new("Brad")
Remember the send method in ruby?
walmart_greeter.send(:sleep_and_greet, "morning")
# ... time passes, you wait as greeter snores obnoxiously ...
# => "Oh! Hi Brad! Good morning."
What if the method takes a long time to run and you want to queue it? async.send it!
walmart_greeter.async.sleep_and_greet("morning")
# ... do some shopping and come back later when the dude wakes up
Quebert figures out how to serialize the class, throw it on a worker queue, re-instantiate it on the other side, and finish up the work.
Quebert.backend.reserve.perform # => "Oh! Hi Brad! Good morning."
# ... Sorry dude! I'm shopping already
Does it work on Class methods? Yeah, that was easier than making instance methods work:
Quebert.async.budweiser_greeting("Coraline")
Quebert.backend.reserve.perform # => "waazup Coraline!"
- Only basic data types are included for serialization. Serializers may be customized to include support for different types.
Backends
- Beanstalk: Enqueue jobs in a beanstalkd service. The workers run in a separate process. Typically used in production environments.
- Sync: Perform jobs immediately upon enqueuing. Typically used in testing environments.
- InProcess: Enqueue jobs in an in-memory array. A worker will need to reserve a job to perform.
Using multiple queues
To start a worker pointed at a non-default queue (e.g., a Quebert "tube"), start the process with -q
:
bundle exec quebert -q other-tube
Then specify the queue name in your job:
class FooJob < Quebert::Job
def queue
"other-tube"
end
def perform(args)
# ...
end
end
Beanstalk: Changing a job's TTR
class FooJob < Quebert::Job
def ttr
5.minutes
end
def perform(args)
# ...
end
end