Tobox: Transactional outbox pattern implementation in ruby

Gem Version pipeline status coverage report

Simple, data-first events processing framework based on the transactional outbox pattern.

Requirements

tobox requires integration with RDBMS which supports SKIP LOCKED functionality. As of today, that's:

  • PostgreSQL 9.5+
  • MySQL 8+
  • Oracle
  • Microsoft SQL Server

Installation

Add this line to your application's Gemfile:

gem "tobox"

# You'll also need to add the right database client gem for the target RDBMS
# ex, for postgresql:
#
# gem "pg"
# see more: http://sequel.jeremyevans.net/rdoc/files/doc/opening_databases_rdoc.html

And then execute:

$ bundle install

Or install it yourself as:

$ gem install tobox

Usage

  1. create the outbox table in your application's database:
# example migration using sequel
Sequel.migration do
  up do
    create_table(:outbox) do
      primary_key :id
      column :type, :varchar, null: false
      column :data_before, :json, null: true
      column :data_after, :json, null: true
      column :created_at, "timestamp without time zone", null: false, default: Sequel::CURRENT_TIMESTAMP
      column :attempts, :integer, null: false, default: 0
      column :run_at, "timestamp without time zone", null: true
      column :last_error, :text, null: true
      column :metadata, :json, null: true

      index Sequel.desc(:run_at)
    end
  end

  down do
    drop_table(:outbox)
  end
end
  1. create a tobox.rb config file in your project directory tree:
# tobox
database Sequel.connect("postgres://user:pass@dbhost/database")
# table :outbox
# concurrency 8
on("user_created") do |event|
  puts "created user #{event[:after]["id"]}"
  DataLakeService.user_created(event)
  BillingService.(event)
end
on("user_updated") do |event|
  # ...
end
on("user_created", "user_updated") do |event|
  # ...
end
  1. Start the tobox process
> bundle exec tobox -C path/to/tobox.rb -r path/to/file_requiring_application_code.rb

There is no API for event production yet (still TODO). It's recommended you write directly into the "outbox" table via database triggers (i.e. *insert into users table -> add user_created event"). Alternatively you can use sequel directly (DB[:outbox].insert(...)).

  1. Emit outbox events

Currently, tobox only deals with outbox events consumption. When it comes to producing, you can do it yourself. There essentially two alternatives:

4.1 Emit from application code

If you're using sequel as your ORM, you can use the dataset API:

# Assuming DB points to your `Sequel::Database`, and defaults are used:
order = Order.new(
  item_id: item.id,
  price: 20_20,
  currency: "EUR"
)
DB.transaction do
  order.save
  DB[:outbox].insert(event_type: "order_created", data_after: order.to_hash)
end

4.2 Emit from database trigger

This is how it could be done in PostgreSQL using trigger functions:

CREATE OR REPLACE FUNCTION order_created_outbox_event()
  RETURNS TRIGGER
  LANGUAGE PLPGSQL
  AS
$$
BEGIN
    INSERT INTO outbox(event_type, data_after)
         VALUES('order_created', row_to_json(NEW.*));
    RETURN NEW;
END;
$$

CREATE TRIGGER order_created_outbox_event
  AFTER INSERT
  ON orders
  FOR EACH ROW
  EXECUTE PROCEDURE order_created_outbox_event();

Configuration

As mentioned above, configuration can be set in a particular file. The following options are configurable:

environment

Sets the application environment (either "development" or "production"). Can be set directly, or via APP_ENV environment variable (defaults to "development").

database_uri

Accepts a URI pointing to a database, where scheme identifies the database adapter to be used:

database_uri `"postgres://user:password@localhost/blog"`.

database_options

Accepts an hash of options, which are directly passed to Sequel.connect:

database_options after_connect: -> (conn) { puts conn }

table

the name of the database table where outbox events are stored (:outbox by default).

table :outbox

max_attempts

Maximum number of times a failed attempt to process an event will be retried (10 by default).

max_attempts 4

Note: the new attempt will be retried in n ** 4, where n is the number of past attempts for that event.

exponential_retry_factor

Factor by which the number of seconds until an event can be retried will be exponentially calculated, i.e. 2 seconds on first attempt, then 4, then 8, then 16 (2 by default).

exponential_retry_factor 2

Note: the new attempt will be retried in n ** 4, where n is the number of past attempts for that event.

batch_size

Number of events fetched in each outbox loop.

batch_size 10

Note: event handlers will receive all events covered by the given callback in its arguments:

on(:event1, :event2) do |*events| # at most 10 events, may contain events of type 1 and 2

concurrency

Number of workers processing events.

concurrency 4

Note: the default concurrency is adapted and different for each worker pool type, so make sure you understand how this tweak may affect you.

max_connections

Number of database connections the workers will share to do their work.

In the (default) threaded worker mode, it'll default to the number of workers (set by the concurrency configuration).

# 10 workers sharing 4 database connections
max_connections 4
concurrency 10

This can be useful when the database presents overhead in managing write intensive workload (such as the one an outbox generates), and you want to be able to scale without putting more work on the database.

worker

Type of the worker used to process events. Can be :thread (default), :fiber, or a class implementing the Tobox::Pool protocol (TBD: define what this protocol is).

wait_for_events_delay

Time (in seconds) to wait before checking again for events in the outbox.

shutdown_timeout

Time (in seconds) to wait for events to finishing processing, before hard-killing the process.

grace_shutdown_timeout

Grace period (in seconds) to wait after, hard-killing the work in progress, and before exiting the process.

on(event_type) { |before, after| }

callback executed when processing an event of the given type. By default, it'll yield the state of data before and after the event (unless message_to_arguments is set).

on("order_created") { |event| puts "order created: #{event[:after]}" }
on("order_updated") { |event| puts "order created: was #{event[:before]}, now is #{event[:after]}" }
# ...

on_before_event { |event| }

callback executed right before proocessing an event.

on_before_event { |event| start_trace(event[:id]) }

on_after_event { |event| }

callback executed right after proocessing an event.

on_before_event { |event| finish_trace(event[:id]) }

on_error_event { |event, error| }

callback executed when an exception was raised while processing an event.

on_error_event { |event, exception| Sentry.capture_exception(exception) }

on_error_worker { |error| }

callback executed when an exception was raised in the worker, before processing events.

on_error_worker { |exception| Sentry.capture_exception(exception) }

on_database_connect { |db| }

Callback executed right after initializing the sequel database object. This can be used, for example, to load database-level extensions and plugins, and set parameters (such as connection pool tweaks). This callback will also be used by plugins which instantiate its own separate database objects (such as in the case of the stats plugin).

This callback won't be executed if the database object is created outside of tobox configuration parameters.

on_database_connect do |db|
  db.extension(:connection_validator)
  db.pool.connection_validation_timeout = -1
end

message_to_arguments { |event| }

if exposing raw data to the on handlers is not what you'd want, you can always override the behaviour by providing an alternative "before/after fetcher" implementation.

# if you'd like to yield the ORM object only
message_to_arguments do |event|
case event_type
when "order_created", "order_updated"
  Order.get(after[:id])
when "payment_created", "payment_processed", "payment_reconciled"
  Payment.get(after[:id])
else
  super(event)
end
on("order_created") { |order| puts "order created: #{order}" }
# ...
on("payment_created") { |payment| puts "payment created: #{payment}" }
# ...

logger

Overrides the internal logger (an instance of Logger).

log_level

Overrides the default log level ("info" when in "production" environment, "debug" otherwise).

Event

The event is composed of the following properties:

  • :id: unique event identifier
  • :type: label identifying the event (i.e. "order_created")
  • :before: hash of the associated event data before event is emitted (can be nil)
  • :after: hash of the associated event data after event is emitted (can be nil)
  • :created_at: timestamp of when the event is emitted

(NOTE: The event is also composed of other properties which are only relevant for tobox.)

Plugins

tobox ships with a very simple plugin system. (TODO: add docs).

Plugins can be loaded in the config via plugin:

# tobox.rb
plugin(:plugin_name)

Progress

By default, the database transaction used to consume the event is kept open while the event is handled. While this ensures atomic event consumption, it may also cause overhead related to transaction management given enough load, particularly in cases where event handling time varies (i.e. throttled HTTP requests).

The :progress plugin fixes this by releasing the databaase transaction after fetching the event. It does so by making the fetched event "invisible" for a certain period, during which the event must be successfully handled.

Here's how to use it:

# in your tobox.rb
plugin :progress

visibility_timeout 90 # default: 30
  1. insert related outbox events with the same group id
order = Order.new(
  item_id: item.id,
  price: 20_20,
  currency: "EUR"
)
DB.transaction do
  order.save
  DB[:outbox].insert(event_type: "order_created", group_id: order.id, data_after: order.to_hash)
  DB[:outbox].insert(event_type: "billing_event_started", group_id: order.id, data_after: order.to_hash)
end

# "order_created" will be processed first
# "billing_event_created" will only start processing once "order_created" finishes

Configuration

visibility_timeout

Timeout (in seconds) after which a previously marked-for-consumption event can be retried (default: 30)

Event grouping

By default, events are taken and processed from the "outbox" table concurrently by workers, which means that, while worker A may process the most recent event, and worker B takes the following, worker B may process it faster than worker A. This may be an issue if the consumer expects events from a certain context to arrive in a certain order.

One solution is to have a single worker processing the "outbox" events. Another is to use the :event_grouping plugin.

All you have to do is:

  1. add a "group id" column to the "outbox" table
create_table(:outbox) do
  primary_key :id
  column :group_id, :integer # The type is irrelevant, could also be :string, :uuid...
  # ..
  index :group_id
  1. Enable the plugin
# in your tobox.rb
plugin :event_grouping

group_column :group_id # by default already `:group_id`
  1. insert related outbox events with the same group id
order = Order.new(
  item_id: item.id,
  price: 20_20,
  currency: "EUR"
)
DB.transaction do
  order.save
  DB[:outbox].insert(event_type: "order_created", group_id: order.id, data_after: order.to_hash)
  DB[:outbox].insert(event_type: "billing_event_started", group_id: order.id, data_after: order.to_hash)
end

# "order_created" will be processed first
# "billing_event_created" will only start processing once "order_created" finishes

Configuration

group_column

Defines the database column to be used for event grouping (:group_id by default).

Inbox

Via the :inbox plugin, tobox also supports the inbox pattern, to ensure "exactly-once" processing of events. This is achieved by "tagging" events with a unique identifier, and registering them in the inbox before processing (and if they're there, ignoring it altogether).

In order to do so, you'll have to:

  1. add an "inbox" table in the database and the unique id reference in the outbox table:
create_table(:inbox) do
  column :inbox_id, :varchar, null: true, primary_key: true # it can also be a uuid, you decide
  column :created_at, "timestamp without time zone", null: false, default: Sequel::CURRENT_TIMESTAMP
end

create_table(:outbox) do
  primary_key :id
  column :type, :varchar, null: false
  column :inbox_id, :varchar, null: true
  # ...
  foreign_key :inbox_id, :inbox
  1. Load the plugin and reference them in the configuration
# tobox.rb
plugin :inbox

inbox_table :inbox # :inbox by default already
inbox_column :inbox_id # :inbox_id by default already
  1. insert related outbox events with an inbox id
order = Order.new(
  item_id: item.id,
  price: 20_20,
  currency: "EUR"
)
DB.transaction do
  order.save
  DB[:outbox].insert(event_type: "order_created", inbox_id: "ord_crt_#{order.id}", data_after: order.to_hash)
  DB[:outbox].insert(event_type: "billing_event_started", inbox_id: "bil_evt_std_#{order.id}", data_after: order.to_hash)
end

# assuming this bit above runs two times in two separate workers, each will be processed by tobox only once.

Configuration

inbox table

Defines the name of the table to be used for inbox (:inbox by default).

inbox column

Defines the column in the outbox table which references the inbox table (:inbox_id by default).

NOTE: make sure you keep cleaning the inbox periodically from older messages, once there's no more danger of receiving them again.

It ships with the following integrations.

Zeitwerk

(requires the zeitwerk gem.)

Plugin for the zeitwerk auto-loader. It allows to set the autoload dirs, and seamlessly integrates code reloading in "development", and eagerloading in "production":

# tobox.rb
plugin(:zeitwerk)
zeitwerk_loader do |loader|
  loader.push_dir("path/to/handlers")
end

Sentry

(requires the sentry-ruby gem.)

Plugin for the sentry ruby SDK for error tracking. It'll send all errors happening while processing events to Sentry.

# tobox.rb
plugin(:sentry)

on_sentry_init do |sentry_cfg|
  sentry.dsn = ENV["SENTRY_DSN"]
end

Datadog

(requires the ddtrace gem.)

Plugin for datadog ruby SDK. It'll generate traces for event handling.

# you can init the datadog config in another file to load:
Datadog.configure do |c|
  c.tracing.instrument :tobox
end

# tobox.rb
plugin(:datadog)

Stats

The stats plugin collects statistics related with the outbox table periodically, and exposes them to app code (which can then relay them to a statsD collector, or similar tool).

plugin(:stats)
on_stats(5) do |stats_collector| # every 5 seconds
  stats = stats_collector.collect
  #
  # stats => {
  #   pending_count: number of new events in the outbox table
  #   failing_count: number of events which have failed processing but haven't reached the threshold
  #   failed_count: number of events which have failed the max number of tries
  #   inbox_count: (if used) number of events marked as received in the inbox table
  # }
  #
  # now you can send them to your statsd collector
  #
  StatsD.gauge('outbox_pending_backlog', stats[:pending_count])
  StatsD.gauge('outbox_oldest_message_age', stats[:oldest_event_age_in_seconds])
end

Bring your own leader election

The stats collection runs on every tobox initiated. If you're launching it in multiple servers / containers / pods, this means you'll be collecting statistics about the same database on all of these instances. This may not be desirable, and you may want to do this collection in a single instance. This is not a problem that tobox can solve by itself, so you'll have to take care of that yourself. Still, here are some cheap recommendations.

Postgres advisory locks

If your database is PostgreSQL, you can leverage session-level advisory locks to ensure single-instance access to this functionality. tobox also exposes the database instance to the on_stats callback:

c.on_stats(5) do |stats_collector, db|
  if db.get(Sequel.function(:pg_try_advisory_lock, 1))
    stats = stats_collector.collect
    StatsD.gauge('outbox_pending_backlog', stats[:pending_count])
  end
end

If a server goes down, one of the remaining ones will acquire the lock and ensure stats processing.

Redis distributed locks

If you're already using redis, you can use its distributed lock feature to achieve the goal:

# using redlock
c.on_stats(5) do |stats_collector, db|
  begin
    lock_info = lock_manager.lock("outbox", 5000)

    stats = stats_collector.collect
    StatsD.gauge('outbox_pending_backlog', stats[:pending_count])

    # extend to hold the lock for the next loop
    lock_info = lock_manager.lock("outbox", 5000, extend: lock_info)
  rescue Redlock::LockError
    # some other server already has the lock, try later
  end
end

Advanced

Batch Events Handling

You may start hitting a scale where the workload generated by tobox puts the master replica under water. Particularly with PostgreSQL, which isn't optimized for writes, this manifests in CPU usage spiking due to index bypasses, or locks on accessing shared buffers.

A way to aleviate this is by hnadling events in batches. By handling N events at a time, the database can drain events more efficiently, while you can either still handle them one by one, or batch them, if possible. For instance, the AWS SDK contains batching alternatives of several APIs, including the SNS publish API.

You can do so by setting a batch size in your configuration, and spread the arguments in the event handler:

# tobox.rb

batch_size 10 # fetching 10 events at a time

on("user_created", "user_updated") do |*events| # 10 events at most
  if events.size == 1
    DataLakeService.user_created(events.first)
  else
    DataLakeService.batch_users_created(events)
  end
end

In case you're using a batch API solution which may fail for a subset of events, you are able to communicate which events from the batch failed by using Tobox.raise_batch_errors API:

on("user_created", "user_updated") do |*events| # 10 events at most
  if events.size == 1
    DataLakeService.user_created(events.first)
  else
    success, failed_events_with_errors = DataLakeService.batch_users_created(events)

    # handle success first

    batch_errors = failed_events_with_errors.to_h do |event, exception|
      [
        events.index(event),
        exception
      ]
    end

    # events identified by the batch index will be retried.
    Tobox.raise_batch_errors(batch_errors)
  end
end

Supported Rubies

All Rubies greater or equal to 2.7, and always latest JRuby and Truffleruby.

Rails support

Rails is supported out of the box by adding the sequel-activerecord_connection gem into your Gemfile, and requiring the rails application in the tobox cli call:

> bundle exec tobox -C path/to/tobox.rb -r path/to/rails_app/config/environment.rb

In the tobox config, you can set the environment:

environment Rails.env

Why?

Simple and lightweight, framework (and programming language) agnostic

tobox event callbacks yield the data in ruby primitive types, rather than heavy ORM instances. This is by design, as callbacks may not rely on application code being loaded.

This allows tobox to process events dispatched from an application done in another programmming language, as an example.

No second storage system

While tobox does not advertise itself as a background job framework, it can be used as such.

Most tiered applications already have an RDBMS. Popular background job solutions, like "sidekiq" and "shoryuken", usually require integrating with a separate message broker (Redis, SQS, RabbitMQ...). This increases the overhead in deployment and operations, as these brokers need to be provisioned, monitored, scaled separately, and billed differently.

tobox only requires the database you usually need to account for anyway, allowing you to delay buying into more complicated setups until you have to and have budget for.

However, it can work well in tandem with such solutions:

# process event by scheduling an active job
on("order_created") { |event| SendOrderMailJob.perform_later(event[:after]["id"]) }

Atomic processing via database transactions

When scheduling work, one needs to ensure that data is committed to the database before scheduling. This is a very frequent bug when using non-RDBMS background job frameworks, such as Sidekiq, which has a FAQ entry for this .

But even if you do that, the system can go down after the data is committed in the database and before the job is enqueued to the broker. Failing to address this behaviour makes the job delivery guarantee "at most once". This may or may not be a problem depending on what your job does (if it bills a customer, it probably is).

By using the database as the message broker, tobox can rely on good old transactions to ensure that data committed to the database has a corresponding event. This makes the delivery guarantee "exactly once".

(The actual processing may change this to "at least once", as issues may happen before the event is successfully deleted from the outbox. Still, "at least once" is acceptable and solvable using idempotency mechanisms).

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake test to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

Contributing

Bug reports and pull requests are welcome on GitHub at https://gitlab.com/os85/tobox.