WaterDrop

Build Status Code Climate

Gem used to send events to Kafka in a standard and in an aspect way.

Installation

gem install waterdrop

or add this to your Gemfile:

gem 'waterdrop'

and run

bundle install

Setup

WaterDrop has following configuration options:

Option Value type Description
send_events Boolean Should we send events to Kafka
kafka_host String Kafka server host
kafka_ports Array Kafka server ports
connection_pool_size Integer Kafka connection pool size
connection_pool_timeout Integer Kafka connection pool timeout

To apply this configuration, you need to use a setup method:

WaterDrop.setup do |config|
  config.send_events = true
  config.connection_pool_size = 20
  config.connection_pool_timeout = 1
  config.kafka_ports = %w( 9092 )
  config.kafka_host = 'localhost'
end

This configuration can be placed in config/initializers and can vary based on the environment:

WaterDrop.setup do |config|
  config.send_events = Rails.env.production?
  config.connection_pool_size = 20
  config.connection_pool_timeout = 1
  config.kafka_ports = %w( 9092 )
  config.kafka_host = Rails.env.production? ? 'prod-host' : 'localhost'
end

Usage

Creating and sending standard events

To send Kafka messages, you don't need to use aspects, you can create and send events directly:

event = WaterDrop::Event.new('topic', 'message')
event.send!

message that you want to send should be either castable to string or to json. If it can be casted to both, it will be casted to json.

Using aspects to handle events

WaterDrop uses Aspector to allow aspect oriented events hookup. If you need extensive details about aspector usage, please refer to the examples directory of this project.

In general aspects allows adding additional behavior to existing code without modifying the code itself. This way we can create and send events, without "polluting" the business logic with it.

All the WaterDrop aspects accept following parameters:

Option Value type Description
ClassName Class Class to which we want to hook
method: :method_name Symbol, Array Method (or methods) to which we want to hook
topic: 'karafka_topic' String, Symbol Kafka topic to which we will send the event

There also a message, after_message and before_message proc parameter that will be evaluated in the methods object context.

Before aspects hookup

WaterDrop::Aspects::BeforeAspect.apply(
  ClassName,
  method: :run,
  topic: 'karafka_topic',
  message: -> { any_class_name_instance_method }
)

now each time before you run:

ClassName.new.run

an event with the given message will be send to Kafka.

After aspects hookup

WaterDrop::Aspects::AfterAspect.apply(
  ClassName,
  method: :run,
  topic: 'karafka_topic',
  message: ->(result) { "This is result of method run: #{result}" }
)

now each time after you run:

ClassName.new.run

an event with the given message will be send to Kafka.

Around aspects hookup

WaterDrop::Aspects::AroundAspect.apply(
  ClassName,
  method: :run,
  topic: 'karafka_topic',
  before_message: -> { any_class_name_instance_method },
  after_message: ->(result) { "This is result of method run: #{result}" }
)

now each time you run:

ClassName.new.run

an event with the given message will be send before and after the method execution.