Fare
Get on the message bus with Fare!
Fare is an event system built around Amazon's SNS and SQS services. You publish an event to SNS, which will automatically be distributed to multiple different queues, from where they can be picked up for processing.
Fare is built around a couple of conventions. These conventions make it possible to remove knowledge between the services in your architecture. The publisher doesn't know about the subscribers and the subscribers don't know the publisher. This leaves you with the freedom to replace any part without impacting the rest.
Note: This system is not intended for real time events.
In a nutshell
Let's say you want to send new users a welcome email. The problem is that user accounts are managed in one service, while email is managed via another. And you don't want to burden the accounts service to know about the million things that need to happen after a user signs up.
Fare lets you publish the event "a user has just signed up" in one service, and let you have a bunch of different services react to it, asynchronously.
First, the accounts service needs to declare which events it can publish. This
is done in a separate file, config/fare.rb
.
app_name "accounts"
publishes subject: "user", action: "signup"
publishes subject: "user", action: "login"
To make sure that the proper topics exist, you run fare update
. A lockfile
is created, similar to how Bundler creates a lockfile containing all the
information it figured out before the service needs to start.
Then in that application, you eventually publish the event:
def create
@user = User.create!(params[:user])
Fare.publish(subject: "user", action: "signup", payload: @user.attributes)
redirect_to some_path
end
Meanwhile, in your email service, you declare that you are interested when users
sign up, so you can send them an email. This is done in the config/fare.rb
of
the email service.
For each type of event you need to specify a middleware stack, similar to Rack middleware.
app_name "postman_pat" # yes, our mailer is called Postman Pat
subscriber do
setup do
require "postman_pat"
end
stack do
listen_to subject: "user", action: "signup"
run do
use FilterUnsubscribed
use SendMail
end
end
end
To make sure that the queues are subscribed to the topics, you run fare update
.
Those middleware might look something like:
class FilterUnsubscribed
def initialize(app, = {})
@app = app
@unsubscribers = .fetch(:unsubscribers) { Unsubscribers }
end
def call(env)
event = env.fetch(:event)
email = event.payload.fetch("email")
# don't continue the middleware chain if users have unsubscribed
unless @unsubscribers.include?(email)
@app.call(env)
end
end
end
Then run fare subscriber start
and it will start polling for user signup events.
How it works
An event with subject "user" and action "signup" will be published to an SNS topic "production-user-signup". The publisher of the event doesn't need to know anything else.
A subscriber called "postman_pat", that is interested in that event will create an SNS queue called "production-postman_pat", which is subscribed to the proper SNS topic(s). The subscriber doesn't need to know what the origin of the event is.
The events themselves are formatted with to a convention. They are in the following JSON format, here is an example:
{
"id": "c51856e0-48bc-4653-8fe2-82c1f549c490",
"subject": "user",
"action": "signup",
"source": "accounts",
"version": "0.1.0",
"sent_at": "2014-01-01 13:48:01",
"payload": "whatever you put in as payload",
}
The "id" field is a UUID that is generated when the event is published. The "source" is the name of the app that published the event. The "version" field is an optional field that you can use to version the structure of your payloads. The payload is whatever you supplied, but serialized to JSON.
This JSON is Base64-encoded because of a limitation of SQS (see Limitations below).
SQS has a neat trick when it comes to polling. If one subscriber picks up the item, it will become invisible and unavailable for others to pick up. After Fare has processed the event, it will be deleted for good. If for some reason you cannot finish processing the message, due to an error or because the timeout (default: 30 seconds) expires, it will become available again. This means that it is really straight forward to run the subscriber multiple times to scale up. That being said, the subscriber is threaded, which means you can already process multiple messages in parallel in one process.
Usage
Similar to Bundler, you'll need to create a configuration file that holds all the topics you want to use in your app. You then need to run a command that will generate a lock version of that configuration file. This is done so that your actions will never have to create a topic or queue when the application is running, slowing your application down in the process.
Publishers
Make a file called config/fare.rb
that includes configuration like this:
# Commands like `fare update` don't load your application, you need to make sure
# it knows how to talk to AWS here.
environment :test do
AWS.config(
secret_access_key: "xxx",
access_key_id: "yyy",
)
end
app_name "my_publisher"
# make a line for each topic you want to publish to:
publishes subject: "user", action: "signup", version: "0.1"
Then run the command to create all the topics and queues:
$ fare update
This will create a file called config/fare.production.lock
. When deploying
add the fare update
command to your deployment scripts after you do bundle
install.
You can only publish events listed.
Subscribers
To create a subscriber, you need the same start of config/fare.rb
as you would
normally have. If you publish events while subscribing you need to list them as
a normal publisher.
In addition to the normal configuration, you need to add a subscriber. A subscriber has one "setup" block and one or more "stacks".
A "stack" lists which events it is interested in with one or more "listen_to" commands.
The subscriber is one queue and subscribes to all topics mentioned in all stacks. By combining all these events into a single queue, you can lighten the load a bit for events that don't happen all that often and don't need a dedicated queue.
Example:
environment :test do
# ....
end
app_name "my_subscriber"
# if this app has a subscriber, configure it like this:
subscriber do
# stuff to do when the subscriber starts
setup do
require "your_app"
end
stack do
# have at least one of these lines for each topic you want to subscribe to:
listen_to subject: "user", action: "signup"
listen_to subject: "user", action: "something_else"
run do
# process the events like Rack middleware
use SomeMiddleware
use SomeOtherMiddleware
end
end
stack do
listen_to subject: "user", action: "stub_toe"
run do
use SomeOtherMiddleware
use SomeMiddleware
end
end
end
Then run the subscriber:
$ fare subscriber start
However, you can also create multiple queues within one code base, if you want to.
To do that, you have to give your subscribers a name:
app_name "my_subscriber"
subscriber do
setup do
# ...
end
stack do
listen_to ...
end
end
subscriber :additional_queue_name do
setup do
# ...
end
stack do
listen_to ...
end
end
Then you need to start each subscriber individually:
$ fare subscriber start --daemonize --name additional_queue_name
$ fare subscriber start --daemonize # only runs the unnamed queue
Processing events
As you saw in the configuration file, Fare works with a middleware like stack. You probably already know how to create middleware from Rack, and middleware for Fare is no different.
class SomeMiddleware
def initialize(app, = {})
@app = app
end
def call(env)
# do some work before
event = env.fetch(:event)
@app.call(env)
# do some work after
end
end
There is no endpoint, because you don't need a special return state as with HTTP requests.
Here are some things you can access on the event you are passed in:
event.subject # => "user"
event.action # => "signup"
event.payload # => { "email" => "[email protected]" } # etc
event.sent_at # => a datetime object
event.version # => "0.1"
event.source # => "name_of_publisher"
event.id # => an event id automatically added by Fare
To run a subscriber that processes the events:
$ fare subscriber start
Run with the --help
option to see the options available.
Backups
You can also let Fare automatically create a backup queue. This special queue will be subscribed to all topics. This might come in handy if you want to process statistics.
To enable, add backup!
to the toplevel of config/fare.rb
You can create a specialized subscriber to listen to those backups, by calling the subscriber "backup".
subscriber :backup do
setup do
# ...
end
stack do
# ...
end
end
And you can run it as normal with $ fare subscriber start --name backup
.
Testing
You can stub publishing of messages when you're testing. Simply add this to your test suite:
Fare.test_mode!
You can find the messages that were published:
Fare.
Don't forget to clear this list between tests:
before :each do
Fare..clear
end
There is also a matcher available for RSpec:
require "fare/rspec"
RSpec.describe "Creating a user" do
it "publishes an event" do
expect {
# ...
}.to publish :user, :signup
end
end
Fare provides a fake in memory queue for testing purposes, that you can put events into, and then drain with the Middleware stack you specified in the configuration file. It will not run forever, so you can easily test that events are being handled correctly.
To test your subscriber, make sure you are in stubbing mode:
Fare.test_mode!
Then to simulate an event in the queue:
# given an event:
Fare.given_event(subject: "X", action: "Y", payload: "Z")
# when the event is handled:
Fare.run
# then do the assertions on the expected outcome
Provided Middleware
There are a couple of middleware provided.
Logging
Logs events, just provide a logger instance.
Example:
use Fare::Middleware::Logging, logger: Logger.new($stdout)
NewRelic
Instruments the the event processing. You need to require this middleware
manually, because we don't want to be loading NewRelic when doing fare update
.
Example:
subscriber do
setup do
require "fare/middleware/newrelic"
end
always_run do
use Fare::Middleware::NewRelic
end
end
Raven
Sends errors to Sentry.
Example:
use Fare::Middleware::Raven, dsn: "http://...", logger: logger, environment: "production"
Limitations
Before using make sure you know the limitations of Amazon SNS and SQS. For instance, there is a maximum of 256KB for messages in SQS, but since Fare adds some metadata, the limit for payloads is a bit less.
Events are serialized with Base64, because SQS doesn't like special characters.
TODO
- Let the middleware know how to revert itself
- Ask for extra handling time from SQS when handling takes too long
- Configuration for SQS Dead Letter queues.