Deferred: A bit of sugar to make async more pleasant

Hey, EventMachine is pretty cool, scales really well, etc. The only thing is that you have to write all your code using deferreds, which can be kind of...challenging. This library will not solve all your problems, it will not walk your dog, it does not do windows. However, it provides syntactic sugar around some core EventMachine libraries to make things a bit more pleasant.

Deferred::InstanceMethods

This module includes EM::Deferrable, and tweaks the behavior a little. First off the callback and errback methods will return self which allows for chaining. In addition, an ensure-like method has been added ensure_that that adds a block to both the callback and errback chains.

client.do_something_eventually.callback do |arg|
  something_happened(arg)
end.errback do |err|
  raise err
end.ensure_that |*|
  cleanup!
end

Sometimes you have a deferred that needs to wait on another deferred to fire before it takes action, for this a chain\_to method is provided.

def something
  Deferred::Default.new.tap do |my_dfr|
    my_dfr.chain_to(client.do_something_eventually)
  end
end

EM::Deferred objects have a timeout method, but they don't let you specify an error class to use. In our code, all of our errbacks take an argument that is an Exception instance. Deferred::InstanceMethods#timeout allows you to optionally specify an Exception subclass that will be passed to registered errbacks after the given timeout is fired.

  dfr = client.do_something_eventually
  dfr.callback { |arg| something_happened!(arg) }
  dfr.errback { |e| puts e.inspect }
  dfr.timeout(3.0, MyTimeoutError)

In this case, if a timeout occurs, the errback would be called with a MyTimeoutError instance with the message "timeout after 3.0 seconds".

Finally, there's the errback\_on\_exception method, that lets you fire the errback chain if an exception is raised in the given code block.

dfr.errback_on_exception do
  # try a bunch of stuff
  raise "Oh noes! something bad!"
end

In this case, registered errbacks would be called with a RuntimeError.

Deferred::Accessors

A common pattern is providing lifecycle events on a given instance, clients connect, disconnect, startup, shutdown, error, etc. The Deferred::Accessor.deferred\_event class method provides a convenient way of declaring these deferreds in a class.

class SomethingManager
  include Deferred::Accessors

  deferred_event :start

  def start
    # do stuff required for starting the SomethingManager

    on_start.succeed
  end
end

It's often more convenient to combine the callback with the call to initiate the action, for example:

class SomethingManager
  include Deferred::Accessors

  deferred_event :start

  def initialize
    @started = false
  end

  def start(&cb)
    on_start(&cb)   # registers cb as a callback

    return on_start if @started
    @started = true

    # this obviously would be some kind of deferred action that would
    # eventually call on_start.succeed
    #
    EM.next_tick { on_start.succeed }

    on_start
  end
end

This allows one to do the following:

s = SomethingManager.new
s.start do 
  do_something_when_manager_started
end.errback do
  raise "Oh Noes! Manager failed to start! Abort!"
end

Deferred::ThreadpoolJob

A common need is to run some blocking task in the EventMachine threadpool using EM.defer. This works well, but there's no way of handling error cases, as when an exception is raised in the threadpool, the reactor dies. Enter the ThreadpoolJob, a Deferred that wraps the running of a job in the threadpool.

The success case

require 'rubygems'
require 'eventmachine'
require 'deferred'

EM.run do
  Deferred::DefaultThreadpoolJob.new.tap do |tpj|
    tpj.before_run do
      $stderr.puts "w00t! we're about to run in the threadpool, EM.reactor_thread? #{EM.reactor_thread?}"
    end

    tpj.on_run do
      $stderr.puts "EM.reactor_thread? #{EM.reactor_thread?}"
      $stderr.puts "sleeping for 0.5s"
      sleep 0.5

      %w[this is the result of the run]
    end

    tpj.callback do |*a|
      $stderr.puts "Success, called with: #{a.inspect}"
    end

    tpj.errback do |exc|
      $stderr.puts "Oh noes! an error happened: #{exc.inspect}"
    end

    # we have all the Deferred::InstanceMethods
    tpj.ensure_that do |*|
      EM.next_tick { EM.stop_event_loop }
    end

    tpj.defer!
  end
end

Produces the output:

w00t! we're about to run in the threadpool, EM.reactor_thread? true
EM.reactor_thread? false
sleeping for 0.5s
Success, called with: [["this", "is", "the", "result", "of", "the", "run"]]

The error case

require 'rubygems'
require 'eventmachine'
require 'deferred'

EM.run do
  Deferred::DefaultThreadpoolJob.new.tap do |tpj|
    tpj.before_run do
      $stderr.puts "w00t! we're about to run in the threadpool, EM.reactor_thread? #{EM.reactor_thread?}"
    end

    tpj.on_run do
      raise "Teh goggles, they do nothing!"
    end

    tpj.callback do |*a|
      $stderr.puts "Success, called with: #{a.inspect}"
    end

    tpj.errback do |exc|
      $stderr.puts "Oh noes! an error happened: #{exc.inspect}"
    end

    # we have all the Deferred::InstanceMethods
    tpj.ensure_that do |*|
      EM.next_tick { EM.stop_event_loop }
    end

    tpj.defer!
  end
end

Produces the output:

w00t! we're about to run in the threadpool, EM.reactor_thread? true
Oh noes! an error happened: #<RuntimeError: Teh goggles, they do nothing!>

Returning a deferred or an object that responds_to?(:defer!)

One use case we had was the ability for a ThreadpoolJob's on_run block to return a deferred, which would be chained to the ThreadpoolJob's callbacks. This allows a lot of flexibility in terms of chaining tasks that should run both in and out of the threadpool.

This functionality was created for a task queueing system where most of the jobs had to run on the threadpool. Check out the rather involved example which shows how to implement a DefaultThreadpoolJob that spawns several sub-jobs and waits for their completion before firing its own callbacks.

From the "Credit Where Credit is Due Dept."

Thanks to Snapfish for sponsoring development of this project and to HPDC, L.P. for agreeing to open source it.