FirehoseIntegration

This gem will send data to Firehose every time your models are updated. The general idea is to use a separate stream for each model you want to serialize. This essentially simplifies data syncing with Amazon Redshift via Firehose.

Basic Usage

Add the following to your Gemfile and bundle install

gem 'firehose-integration'

Create and configure config/initializers/aws.rb

require 'aws-sdk'

Aws.config.update({
  region: 'us-east-1',
  credentials: Aws::Credentials.new('akid', 'secret')
})

For each model you want to integrate create a kinesis serializer

# apps/kinesis_serializers/dummy_model_kinesis_serializer.rb
module DummyModelKinesisSerializer
  extend ActiveSupport::Concern

  included do
    def to_kinesis
      prepare_for_redshift [
        id,
        updated_at,
        created_at
      ]
    end

    def self.kinesis_stream_name
      'dummy-stream'
    end
  end
end

Add the following line to the model you want to integrate.

# app/models/dummy_model.rb
class DummyModel < ActiveRecord::Base
  firehose_integratable
end

You will need to make sure you have the streams you’re using created in Amazon Firehose. For the DummyModel example here we’d setup a ‘dummy-stream’ in Firehose.

Advanced Usage

There are cases where simple 1-to-1 serialization from your existing tables won’t work with Redshift. One such case is if you use Postgres array datatypes. Redshift doesn’t support that datatype so you will need to normalize the data again when putting it in Redshift. To handle these edge cases you create a ‘kinesis_extra_serialization’ method which will be executed when your model is updated if present.

# apps/kinesis_serializers/dummy_model_kinesis_serializer.rb
module DummyModelKinesisSerializer
  extend ActiveSupport::Concern

  included do
    def to_kinesis
      prepare_for_redshift [
        id,
        updated_at,
        created_at
      ]
    end

    def self.kinesis_stream_name
      'dummy-stream'
    end

    def kinesis_extra_serialization
      self.book_ids.each do |book_id|
        data = [id, book_id, created_at, updated_at].join("|")
        FirehoseIntegration::KinesisJob.perform_later("dummy-books", data)
      end
    end
  end
end

Another common case you may encounter is the need to denormalize data into other tables. Lets assume you have a dummy_models table that you want but you also have a denormalized_dummy_models table you need to keep in sync when your model is updated.

# apps/kinesis_serializers/dummy_model_kinesis_serializer.rb
module DummyModelKinesisSerializer
  extend ActiveSupport::Concern
  include DenormalizedDummyModelKinesisSerializer

  included do
    def to_kinesis
      prepare_for_redshift [
        id,
        updated_at,
        created_at
      ]
    end

    def self.kinesis_stream_name
      'dummy-stream'
    end

    def kinesis_extra_serialization
      serialize_denormalized_dummy_model(id)
    end
  end
end

# apps/kinesis_serializers/denormalized_dummy_model_kinesis_serializer.rb
module DenormalizedDummyModelKinesisSerializer
  extend ActiveSupport::Concern

  included do
    def denormalized_dummy_model_to_kinesis(dummy_id)
      prepare_for_redshift [ id,
        created_at,
        updated_at
      ]
    end

    def self.denormalized_kinesis_stream_name
      "denormalized-dummy-model"
    end

    def serialize_denormalized_dummy_model(dummy_id)
      FirehoseIntegration::KinesisJob.perform_later(self.class.denormalized_kinesis_stream_name, self.denormalized_dummy_model_to_kinesis(dummy_id))
    end
  end
end

If you’d like to turn the syncing off for some reason like when running on a staging server, just set the following ENV variable to true:

SKIP_KINESIS_EVENTS=true

Versions

0.0.4 - Forcing UTF-8 encoding for strings 0.0.2 - Adding ability to turn off job queueing with ENV variable 0.0.1 - Initial release

License

This project rocks and uses MIT-LICENSE.