Journaled

A Rails engine to durably deliver schematized events to Amazon Kinesis via DelayedJob.

More specifically, journaled is composed of three opinionated pieces: schema definition/validation via JSON Schema, transactional enqueueing via Delayed::Job (specifically delayed_job_active_record), and event transmission via Amazon Kinesis. Our current use-cases include transmitting audit events for durable storage in S3 and/or analytical querying in Amazon Redshift.

Journaled provides an at-least-once event delivery guarantee assuming Delayed::Job is configured not to delete jobs on failure.

Note: Do not use the journaled gem to build an event sourcing solution as it does not guarantee total ordering of events. It's possible we'll add scoped ordering capability at a future date (and would gladly entertain pull requests), but it is presently only designed to provide a durable, eventually consistent record that discrete events happened.

Installation

  1. Install delayed_job_active_record if you haven't already.

  2. To integrate Journaled into your application, simply include the gem in your app's Gemfile.

    gem 'journaled', https_github: 'Betterment/journaled'
    
  3. You will also need to define the following environment variables to allow Journaled to publish events to your AWS Kinesis event stream:

  • JOURNALED_STREAM_NAME

Special case: if your Journaled::Event objects override the #journaled_app_name method to a non-nil value e.g. my_app, you will instead need to provide a corresponding [upcased_app_name]_JOURNALED_STREAM_NAME variable for each distinct value, e.g. MY_APP_JOURNALED_STREAM_NAME. You can provide a default value for all Journaled::Events in an initializer like this:

Journaled.default_app_name = 'my_app'

You may optionally define the following ENV vars to specify AWS credentials outside of the locations that the AWS SDK normally looks:

  • RUBY_AWS_ACCESS_KEY_ID
  • RUBY_AWS_SECRET_ACCESS_KEY

You may also specify the region to target your AWS stream by setting AWS_DEFAULT_REGION. If you don't specify, Journaled will default to us-east-1.

Journaled::Event provides a commit_hash method which you may journal if you like. If you choose to use it, you must provide a GIT_COMMIT environment variable.

Usage

Change Journaling

Out of the box, Journaled provides an event type and ActiveRecord mix-in for durably journaling changes to your model, implemented via ActiveRecord hooks. Use it like so:

class User < ApplicationRecord
  include Journaled::Changes

  journal_changes_to :email, :first_name, :last_name, as: :identity_change
end

Add the following to your controller base class for attribution:

class ApplicationController < ActionController::Base
  include Journaled::Actor

  self.journaled_actor = :current_user # Or your authenticated entity
end

Your authenticated entity must respond to #to_global_id, which ActiveRecords do by default.

Every time any of the specified attributes is modified, or a User record is created or destroyed, an event will be sent to Kinesis with the following attributes:

  • id - a random event-specific UUID
  • event_type - the constant value journaled_change
  • created_at- when the event was created
  • table_name - the table name backing the ActiveRecord (e.g. users)
  • record_id - the primary key of the record, as a string (e.g. "300")
  • database_operation - one of create, update, delete
  • logical_operation - whatever logical operation you specified in your journal_changes_to declaration (e.g. identity_change)
  • changes - a serialized JSON object representing the latest values of any new or changed attributes from the specified set (e.g. {"email":"[email protected]"})
  • actor - a string (usually a rails global_id) representing who performed the action.

Custom Journaling

For every custom implementation of journaling in your application, define the JSON schema for the attributes in your event. This schema file should live in your Rails application at the top level and should be named in snake case to match the class being journaled. E.g.: your_app/journaled_schemas/my_class.json)

In each class you intend to use Journaled, include the Journaled::Event module and define the attributes you want captured. After completing the above steps, you can call the journal! method in the model code and the declared attributes will be published to the Kinesis stream. Be sure to call journal! within the same transaction as any database side effects of your business logic operation to ensure that the event will eventually be delivered if-and-only-if your transaction commits.

Example:

// journaled_schemas/contract_acceptance_event.json

{
  "type": "object",
  "title": "contract_acceptance_event",
  "required": [
    "user_id",
    "signature"
  ],
  "properties": {
    "user_id": {
      "type": "integer"
    },
    "signature": {
      "type": "string"
    }
  }
}
# app/models/contract_acceptance_event.rb

ContractAcceptanceEvent = Struct.new(:user_id, :signature) do
  include Journaled::Event

  journal_attributes :user_id, :signature
end
# app/models/contract_acceptance.rb

class ContractAcceptance
  include ActiveModel::Model

  attr_accessor :user_id, :signature

  def user
    @user ||= User.find(user_id)
  end

  def contract_acceptance_event
    @contract_acceptance_event ||= ContractAcceptanceEvent.new(user_id, signature)
  end

  def save!
    User.transaction do
      user.update!(contract_accepted: true)
      contract_acceptance_event.journal!
    end
  end
end

An event like the following will be journaled to kinesis:

{
  "id": "bc7cb6a6-88cf-4849-a4f0-a31b0b199c47", // A random event ID for idempotency filtering
  "event_type": "contract_acceptance_event",
  "created_at": "2019-01-28T11:06:54.928-05:00",
  "user_id": 123,
  "signature": "Sarah T. User"
}

Future improvements & issue tracking

Suggestions for enhancements to this engine are currently being tracked via Github Issues. Please feel free to open an issue for a desired feature, as well as for any observed bugs.