Gem used to send messages to Kafka in a standard and in an aspect way.


gem install waterdrop

or add this to your Gemfile:

gem 'waterdrop'

and run

bundle install


WaterDrop has following configuration options:

Option Value type Description
send_messages Boolean Should we send messages to Kafka
kafka_hosts Array Kafka servers hosts with ports
connection_pool_size Integer Kafka connection pool size
connection_pool_timeout Integer Kafka connection pool timeout
raise_on_failure Boolean Should we raise an exception when we cannot send message to Kafka - if false will silently ignore failures (will just ignore them)

To apply this configuration, you need to use a setup method:

WaterDrop.setup do |config|
  config.send_messages = true
  config.connection_pool_size = 20
  config.connection_pool_timeout = 1
  config.kafka_hosts = ['localhost:9092']
  config.raise_on_failure = true

This configuration can be placed in config/initializers and can vary based on the environment:

WaterDrop.setup do |config|
  config.send_messages = Rails.env.production?
  config.connection_pool_size = 20
  config.connection_pool_timeout = 1
  config.kafka_hosts = [Rails.env.production? ? 'prod-host:9091' : 'localhost:9092']
  config.raise_on_failure = Rails.env.production?


Creating and sending standard messages

To send Kafka messages, you don't need to use aspects, you can create and send messages directly:

message = WaterDrop::Message.new('topic', 'message')

message that you want to send should be either castable to string or to json. If it can be casted to both, it will be casted to json.

Using aspects to handle messages

WaterDrop uses Aspector to allow aspect oriented messages hookup. If you need extensive details about aspector usage, please refer to the examples directory of this project.

In general aspects allows adding additional behavior to existing code without modifying the code itself. This way we can create and send messages, without "polluting" the business logic with it.

All the WaterDrop aspects accept following parameters:

Option Value type Description
ClassName Class Class to which we want to hook
method: :method_name Symbol, Array Method (or methods) to which we want to hook
topic: 'karafka_topic' String, Symbol Kafka topic to which we will send the message

There also a message, after_message and before_message proc parameter that will be evaluated in the methods object context.

Before aspects hookup

  method: :run,
  topic: 'karafka_topic',
  message: -> { any_class_name_instance_method }

now each time before you run:


a message with the given message will be send to Kafka.

After aspects hookup

  method: :run,
  topic: 'karafka_topic',
  message: ->(result) { "This is result of method run: #{result}" }

now each time after you run:


a message with the given message will be send to Kafka.

Around aspects hookup

  method: :run,
  topic: 'karafka_topic',
  before_message: -> { any_class_name_instance_method },
  after_message: ->(result) { "This is result of method run: #{result}" }

now each time you run:


a message with the given message will be send before and after the method execution.


