RailsPipeline

Emit a versioned stream of changes to a pub/sub queue when ActiveRecord models are saved. This gem supports Redis, AWS (SNS/SQS), and IronMQ publishing targets. The Redis backend supports a forwarding system to a cloud MQ like IronMQ. Messages are encrypted in transit using AES symmetric encryption and a shared secret.

Motivation

Many systems evolve into a inter-related collection of Rails applications as they grow out of their initial monolithic design. This is often coincident with independent teams taking responsibility for different aspects of the business and different applications. In our case this has manifested itself as one team that is responsible for the ecommerce platform (web, mobile etc) and another team that works on data warehousing and personalization. The data team was using a read-only replica of the platform team's database for data warehousing, which tightly coupled us together when the platform team wanted to make schema changes for performance or development velocity reasons.

This project allows us to offer a versioned API that we can maintain backwards compatibility for while changing the underlying code and schema as we see fit. Consumers of data will upgrade to new schema versions as they need to and when they are able.

Usage

Install with bundler

gem 'rails-pipeline'

You should create a private repository containing your protocolbuffers schemas and also depend on that, or use git subtree to bring it into your Rails apps.

For any models that you wish to publish changes, just include the appropriate pipeline emitter

include RailsPipeline::RedisEmitter

Each queue backend has different methods of consuming messages as a subscriber, but for IronMQ there is an implementation of a webhook subscriber (details below).

The following environment variables control pipeline operations

# If set, do not emit or process incoming messages
DISABLE_RAILS_PIPELINE

# If set, do not emit pipeline messages
DISABLE_RAILS_PIPELINE_EMISSION

# If set, do not process incoming messages as a subscriber, just drop them
DISABLE_RAILS_PIPELINE_PROCESSING

The following environment variable sets the shared secret for pipeline encryption

PIPELINE_SECRET

You can pass in a logger for RailsPipeline in an initializer e.g.

RailsPipeline.logger = Rails.logger
RailsPipeline.logger.level = Log4r::DEBUG

Backends

Redis

Redis EmitterOnly as a forwarding intermediary
Redis SubscriberOnly forwarder

The implementation for Redis assumes you want to use it as a local forwarding queue to a more scalable service such as AWS or IronMQ. Thus all messages are pushed onto a single Redis queue and include the name of the target topic/queue. We have included a bouncer process that will read from the Redis queue (in parallel if need be) and forward on to IronMQ. Adding an AWS forwarder would be trivial.

It may be desirable to write a full pub/sub emitter for Redis (rather than just a forwarder.)

Redis Config

The following environment variables are checked for Redis urls (default: localhost:6379):

REDISCLOUD_URL
REDISTOGO_URL

Alternatively you could pass in an instance of the Redis client in an initializer:

RailsPipeline::RedisPublisher.redis = MyRedisFactory.get

The key name of the redis queue should be set in an initializer, e.g.

RailsPipeline::RedisPublisher.namespace = "rails-pipeline-spec"

IronMQ

IronMQ EmitterYES
Redis-to-IronMQ ForwarderYES
IronMQ SubscriberYES

Emitter

Each model/version emits ProtocolBuffer messages to a specific IronMQ queue. If that queue is set as a "push" queue. Subscribers can then add themselves as http webhook endpoints for the push queue and messages will be delivered to them.

Subscriber

There is a subscriber implementation for IronMQ as an http endpoint.

Mount the Sinatra endpoint in your app's routes.rb

match "/ironmq" => RailsPipeline::IronmqSubscriber, :anchor => false

Register your own models as recipients of different pipeline message types and versions (in an Rails initializer):

RailsPipeline::Subscriber.register(SomeModel_2_0, MyModel)

You will need to write a MyModel#from_pipeline_2_0() method. You can also register any Proc as a processor for messages.

Add your URL as a subscriber to the push queues you care about using the supplied 'pipeline' command

pipeline ironmq-subscribe-endpoint http://my.domain.com/ironmq some_models

You may find ngrok helpful for developing and debugging.

Iron.io Config

The 'iron_mq' gem picks up the following environment variables

IRON_PROJECT_ID
IRON_TOKEN

AWS (Simple Notification Service)

SNS EmitterYES
Redis-to-SNS ForwarderNO, but easy to add.
SQS Polling SubscriberNO
SNS Webhook SubscriberNO

We include a proof-of-concept AWS emitter, written with the idea in mind to use SQS as pub/sub queues and polling subscribers. It would also be possible to publish to SNS and have multiple subscribers receive http webhook messages as in IronMQ.

There are some commands in the 'pipeline' script to configure SNS/SQS:

Create SNS topics to publish to:

pipeline sns-create-topic TABLE_NAME --env ENV --version VERSION

Create and SQS queue and subscribe it to a TOPIC (one per subscribing rails app)

pipeline sqs-subscribe-app APP TABLE_NAME[,TABLE_NAME_2,...] --env ENV --version VERSION

AWS Config

The AWS gem picks up the following environment variables

AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY

In addition, we use the numerical 'owner id' for your account which should be set as

AWS_ACCOUNT_ID

Protocol Buffers

To build the test protocol buffers ruby files in rails-pipeline:

brew install protobuf
make

We have created a private repository gem for our protocol buffers definitions. This is laid out like

harrys-pipeline/lib/harrys/pipeline/my_model_1_1.proto

Proto files look like this


    package Harrys.Pipeline;

    message Order__1__0 {
      required int32 id = 1;
      required double created_at = 2;
      required double updated_at = 3;
      ...
    }

We then have a Makefile almost identical to the one in this gem to build our .pb.rb files:


GENDIR=./lib/harrys/pipeline
RUBY_PROTOC=bundle exec ruby-protoc
PROTOS=$(wildcard $(GENDIR)/*.proto)
PBS=$(PROTOS:%.proto=%.pb.rb)

all: $(PBS)

%.pb.rb: %.proto
        $(RUBY_PROTOC) $<

clean:
        rm -f $(PBS)

TravisCI