Fare

Get on the message bus with Fare!

Fare is an event system built around Amazon's SNS and SQS services. You publish an event to SNS, which will automatically be distributed to multiple different queues, from where they can be picked up for processing.

Fare is built around a couple of conventions. These conventions make it possible to remove knowledge between the services in your architecture. The publisher doesn't know about the subscribers and the subscribers don't know the publisher. This leaves you with the freedom to replace any part without impacting the rest.

Note: This system is not intended for real time events.

In a nutshell

Let's say you want to send new users a welcome email. The problem is that user accounts are managed in one service, while email is managed via another. And you don't want to burden the accounts service to know about the million things that need to happen after a user signs up.

Fare lets you publish the event "a user has just signed up" in one service, and let you have a bunch of different services react to it, asynchronously.

First, the accounts service needs to declare which events it can publish. This is done in a separate file, config/fare.rb.

app_name "accounts"

publishes subject: "user", action: "signup"
publishes subject: "user", action: "login"

To make sure that the proper topics exist, you run fare update. A lockfile is created, similar to how Bundler creates a lockfile containing all the information it figured out before the service needs to start.

Then in that application, you eventually publish the event:

def create
  @user = User.create!(params[:user])
  Fare.publish(subject: "user", action: "signup", payload: @user.attributes)
  redirect_to some_path
end

Meanwhile, in your email service, you declare that you are interested when users sign up, so you can send them an email. This is done in the config/fare.rb of the email service.

For each type of event you need to specify a middleware stack, similar to Rack middleware.

app_name "postman_pat" # yes, our mailer is called Postman Pat

subscriber do

  setup do
    require "postman_pat"
  end

  stack do
    listen_to subject: "user", action: "signup"
    run do
      use FilterUnsubscribed
      use SendMail
    end
  end
end

To make sure that the queues are subscribed to the topics, you run fare update.

Those middleware might look something like:

class FilterUnsubscribed
  def initialize(app, options = {})
    @app = app
    @unsubscribers = options.fetch(:unsubscribers) { Unsubscribers }
  end

  def call(env)
    event = env.fetch(:event)
    email = event.payload.fetch("email")

    # don't continue the middleware chain if users have unsubscribed
    unless @unsubscribers.include?(email)
      @app.call(env)
    end
  end
end

Then run fare subscriber start and it will start polling for user signup events.

How it works

An event with subject "user" and action "signup" will be published to an SNS topic "production-user-signup". The publisher of the event doesn't need to know anything else.

A subscriber called "postman_pat", that is interested in that event will create an SNS queue called "production-postman_pat", which is subscribed to the proper SNS topic(s). The subscriber doesn't need to know what the origin of the event is.

The events themselves are formatted with to a convention. They are in the following JSON format, here is an example:

{
  "id":       "c51856e0-48bc-4653-8fe2-82c1f549c490",
  "subject":  "user",
  "action":   "signup",
  "source":   "accounts",
  "version":  "0.1.0",
  "sent_at":  "2014-01-01 13:48:01",
  "payload":  "whatever you put in as payload",
}

The "id" field is a UUID that is generated when the event is published. The "source" is the name of the app that published the event. The "version" field is an optional field that you can use to version the structure of your payloads. The payload is whatever you supplied, but serialized to JSON.

This JSON is Base64-encoded because of a limitation of SQS (see Limitations below).

SQS has a neat trick when it comes to polling. If one subscriber picks up the item, it will become invisible and unavailable for others to pick up. After Fare has processed the event, it will be deleted for good. If for some reason you cannot finish processing the message, due to an error or because the timeout (default: 30 seconds) expires, it will become available again. This means that it is really straight forward to run the subscriber multiple times to scale up. That being said, the subscriber is threaded, which means you can already process multiple messages in parallel in one process.

Usage

Similar to Bundler, you'll need to create a configuration file that holds all the topics you want to use in your app. You then need to run a command that will generate a lock version of that configuration file. This is done so that your actions will never have to create a topic or queue when the application is running, slowing your application down in the process.

Publishers

Make a file called config/fare.rb that includes configuration like this:

# Commands like `fare update` don't load your application, you need to make sure
# it knows how to talk to AWS here.
environment :test do
  AWS.config(
    secret_access_key: "xxx",
    access_key_id: "yyy",
  )
end

app_name "my_publisher"

# make a line for each topic you want to publish to:
publishes subject: "user", action: "signup", version: "0.1"

Then run the command to create all the topics and queues:

$ fare update

This will create a file called config/fare.production.lock. When deploying add the fare update command to your deployment scripts after you do bundle install.

You can only publish events listed.

Subscribers

To create a subscriber, you need the same start of config/fare.rb as you would normally have. If you publish events while subscribing you need to list them as a normal publisher.

In addition to the normal configuration, you need to add a subscriber. A subscriber has one "setup" block and one or more "stacks".

A "stack" lists which events it is interested in with one or more "listen_to" commands.

The subscriber is one queue and subscribes to all topics mentioned in all stacks. By combining all these events into a single queue, you can lighten the load a bit for events that don't happen all that often and don't need a dedicated queue.

Example:

environment :test do
  # ....
end

app_name "my_subscriber"

# if this app has a subscriber, configure it like this:
subscriber do

  # stuff to do when the subscriber starts
  setup do
    require "your_app"
  end

  stack do
    # have at least one of these lines for each topic you want to subscribe to:
    listen_to subject: "user", action: "signup"
    listen_to subject: "user", action: "something_else"
    run do
      # process the events like Rack middleware
      use SomeMiddleware
      use SomeOtherMiddleware
    end
  end

  stack do
    listen_to subject: "user", action: "stub_toe"
    run do
      use SomeOtherMiddleware
      use SomeMiddleware
    end
  end

end

Then run the subscriber:

$ fare subscriber start

However, you can also create multiple queues within one code base, if you want to.

To do that, you have to give your subscribers a name:

app_name "my_subscriber"

subscriber do
  setup do
    # ...
  end

  stack do
    listen_to ...
  end
end

subscriber :additional_queue_name do
  setup do
    # ...
  end
  stack do
    listen_to ...
  end
end

Then you need to start each subscriber individually:

$ fare subscriber start --daemonize --name additional_queue_name
$ fare subscriber start --daemonize # only runs the unnamed queue

Processing events

As you saw in the configuration file, Fare works with a middleware like stack. You probably already know how to create middleware from Rack, and middleware for Fare is no different.

class SomeMiddleware

  def initialize(app, options = {})
    @app = app
  end

  def call(env)
    # do some work before
    event = env.fetch(:event)

    @app.call(env)

    # do some work after
  end

end

There is no endpoint, because you don't need a special return state as with HTTP requests.

Here are some things you can access on the event you are passed in:

event.subject # => "user"
event.action  # => "signup"
event.payload # => { "email" => "[email protected]" } # etc
event.sent_at # => a datetime object
event.version # => "0.1"
event.source  # => "name_of_publisher"
event.id      # => an event id automatically added by Fare

To run a subscriber that processes the events:

$ fare subscriber start

Run with the --help option to see the options available.

Backups

You can also let Fare automatically create a backup queue. This special queue will be subscribed to all topics. This might come in handy if you want to process statistics.

To enable, add backup! to the toplevel of config/fare.rb

You can create a specialized subscriber to listen to those backups, by calling the subscriber "backup".

subscriber :backup do

  setup do
    # ...
  end

  stack do
    # ...
  end

end

And you can run it as normal with $ fare subscriber start --name backup.

Testing

You can stub publishing of messages when you're testing. Simply add this to your test suite:

Fare.test_mode!

You can find the messages that were published:

Fare.stubbed_messages

Don't forget to clear this list between tests:

before :each do
  Fare.stubbed_messages.clear
end

There is also a matcher available for RSpec:

require "fare/rspec"

RSpec.describe "Creating a user" do

  it "publishes an event" do
    expect {
      # ...
    }.to publish :user, :signup
  end

end

Fare provides a fake in memory queue for testing purposes, that you can put events into, and then drain with the Middleware stack you specified in the configuration file. It will not run forever, so you can easily test that events are being handled correctly.

To test your subscriber, make sure you are in stubbing mode:

Fare.test_mode!

Then to simulate an event in the queue:

# given an event:
Fare.given_event(subject: "X", action: "Y", payload: "Z")

# when the event is handled:
Fare.run

# then do the assertions on the expected outcome

Provided Middleware

There are a couple of middleware provided.

Logging

Logs events, just provide a logger instance.

Example:

use Fare::Middleware::Logging, logger: Logger.new($stdout)

NewRelic

Instruments the the event processing. You need to require this middleware manually, because we don't want to be loading NewRelic when doing fare update.

Example:

subscriber do
  setup do
    require "fare/middleware/newrelic"
  end
  always_run do
    use Fare::Middleware::NewRelic
  end
end

Raven

Sends errors to Sentry.

Example:

use Fare::Middleware::Raven, dsn: "http://...", logger: logger, environment: "production"

Limitations

Before using make sure you know the limitations of Amazon SNS and SQS. For instance, there is a maximum of 256KB for messages in SQS, but since Fare adds some metadata, the limit for payloads is a bit less.

Events are serialized with Base64, because SQS doesn't like special characters.

TODO

  • Let the middleware know how to revert itself
  • Ask for extra handling time from SQS when handling takes too long
  • Configuration for SQS Dead Letter queues.