Pheromone
Pheromones are chemical substances secreted from glands and used as a means of communication.
pheromone allows setting up producers that publish ActiveRecord updates to Kafka whenever there is a model update and/or create.
Installation
Add this line to your application's Gemfile:
gem 'pheromone'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install pheromone
Pheromone Setup
Pheromone depends on waterdrop to send messages to Kafka. waterdrop settings can be added by following the Setup step on waterdrop
In order to setup pheromone, both waterdrop and pheromone need to be setup. Run this to generate pheromone configuration:
$ bundle exec rails generate pheromone:initializer
This will generate the following in config/initializers/pheromone.rb
Pheromone.setup do |pheromone_config|
# pheromone_config.background_processor.name = ':resque / :sidekiq'
# pheromone_config.background_processor.klass = 'BackgroundWorker'
# pheromone_config.timezone = 'UTC'
pheromone_config. = :json
WaterDrop.setup do |waterdrop_config|
waterdrop_config.deliver = Rails.env.production?
waterdrop_config.kafka.seed_brokers = [Rails.env.production? ? ENV['KAFKA_HOST'] : 'localhost:9092']
end
end
Edit this file to modify the default config. The following configuration options are available:
| Option | Value type | Description |
|---|---|---|
| background_processor.name | Symbol | Choose :sidekiq or :resque as the background processor only if messages need to be sent to kafka asynchronously |
| background_processor.klass | String | Background processor class name that sends messages to kafka |
| background_processor.custom_processor | Proc | Custom processor (only works when you specify name as :custom) |
| timezone_format | String | Valid timezone name for timestamps sent to kafka |
| message_format | Symbol | Only supports :json format currently |
| enabled | Boolean | Defaults to true. When this is set to false, no messages will be sent to Kafka |
The timezone setting will transform any timestamp attributes in the message to the specified format.
Usage
1. Sending messages to kafka asynchronously
The underlying Kafka client used by pheromone is ruby-kafka. This client provides a normal producer that sends messages to Kafka synchronously, and an async_producer to send messages to Kafka asynchronously.
It is advisable to use the normal producer in production systems because async producer provides no guarantees that the messages will be delivered. To read more on this, refer the ruby-kafka documentation
Even while using a synchronous producer, sometimes there might be a need to run send messages to Kafka in a background task. This is especially true for batch processing tasks that send a high message volume to Kafka. To allow for this, pheromone provides an async mode that can be specified as an option to publish by specifying dispatch_method as :async. By default, dispatch_method will be :sync. Specifying :async will still use the normal producer and NOT the async_producer.
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:create],
topic: :topic_test,
message: ->(obj) { { name: obj.name } },
dispatch_method: :async
}
]
end
The background_processor can be set inside Pheromone.config.background_processor.name as either :resque or sidekiq.
1.a. Using :resque
Create a new class and add the name under Pheromone.config.background_processor.klass. Implement a class method perform(message), and invoke message.send! inside the method as shown below:
class ResqueJob
@queue = :low
def self.perform()
Pheromone::Messaging::Message.new(
topic: ['topic'],
metadata: { source: 'application1' },
blob: ['blob'],
metadata: ['metadata'],
options: ['options']
).send!
end
end
1.b. Using :sidekiq
Create a new class and add the name under Pheromone.config.background_processor.klass. Implement an instance method perform_async(message), and invoke message.send! inside the method as shown below:
class SidekiqJob
include Sidekiq::Worker
def perform()
Pheromone::Messaging::Message.new(
topic: ['topic'],
blob: ['blob'],
metadata: ['metadata'],
options: ['options']
).send!
end
end
1.c. Implement your own processor
You can also implement your own processor in addition to resque and sidekiq.
Pheromone.setup do |config|
config.background_processor.name = :custom
config.background_processor.klass = 'CustomJob'
config.background_processor.custom_processor = -> (klass, msg) do
klass.perform(msg)
end
end
pheromone will invoke the class name specified in the config with the message object. This mode can be used if you don't want to block a request that ends up sending messages to Kafka.
2. Supported events
2.a. To send messages for model create event, add the following lines to your ActiveRecord model
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:create],
topic: :topic1,
metadata: { source: 'application1' },
message: ->(obj) { { name: obj.name } }
}
]
end
2.b. To send messages for model update event, specify update in the event_types array:
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:update],
topic: :topic1,
metadata: { source: 'application1' },
message: ->(obj) { { name: obj.name } }
}
]
end
Messages can be published for multiple event types by defining events_types: [:create, :update].
3. Supported message formats
3.a. Using a proc in message
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:create],
topic: :topic1,
metadata: { source: 'application1' },
message: ->(obj) { { name: obj.name } }
}
]
end
3.b. Using a defined function in message
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:update],
topic: :topic1,
metadata: { source: 'application1' },
message:
}
]
def
{ name: self.name }
end
end
3.c. Using a serializer in message
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:create],
topic: :topic1,
metadata: { source: 'application1' },
serializer: Class.new(BaseSerializer) { attributes :name, :type }
}
]
end
4. Sending messages conditionally
4.a. Using a proc in if
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:update],
topic: :topic1,
message: ,
if: ->(data) { data.condition },
}
]
def
{ name: self.name }
end
end
4.b. Using a defined function in if
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:update],
topic: :topic1,
message: ,
if: pre_condition
}
]
def pre_condition
name.present?
end
def
{ name: self.name }
end
end
5. Specifying the topic
The kafka topic can be specified in the topic option to publish. To publish to topic_test, use the following:
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:create],
topic: :topic_test,
message: ->(obj) { { name: obj.name } }
}
]
end
6. Specifying producer options
Ruby-Kafka allows sending options to change the behaviour of Kafka Producer.
These can be sent in by passing producer_options to the publish method:
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:create],
topic: :topic_test,
message: ->(obj) { { name: obj.name } },
producer_options: {
key: 'key',
partition: 1,
partition_key: 'key'
}
}
]
end
7. Specifying message metadata
Pheromone makes it easier to standardise the format of messages sent to Kafka while affording the flexibility to add other custom fields using the metadata option.
By default, the messages sent to Kafka have an event_type and timestamp attached to it as shown below:
{
'event_type' => 'create',
'timestamp' => '2015-07-14T10:10:00.000+08:00',
'blob' => {
'message_text' => 'test'
}
}.to_json
By specifying metadata to the publish method:
class PublishableModel < ActiveRecord::Base
include Pheromone::Publishable
publish [
{
event_types: [:create],
topic: :topic_test,
message: ->(obj) { { name: obj.name } },
metadata: { source: 'application1' },
producer_options: {
key: 'key',
partition: 1,
partition_key: 'key'
}
}
]
end
The Kafka message will have the original metadata in addition to the new fields:
{
'event_type' => 'create',
'timestamp' => '2015-07-14T10:10:00.000+08:00',
'source' => 'application1',
'blob' => {
'message_text' => 'test'
}
}.to_json
8. Sending messages to Kafka directly
pheromone provides a custom message object that sends messages to Kafka in a predefined format, to maintain consistency in the message fields.
Pheromone::Messaging::Message can be initialized with the following arguments:
topic: name of the topic to which the message is producedblob: the actual message itselfmetadata: any additional fields that must be sent along with the messageoptions: producer options as described in Section 6
Of these fields, only topic and message are compulsory and the remaining two are optional.
Example usage:
Pheromone::Messaging::Message.new(
topic: 'test_topic',
blob: { message_text: 'test' },
metadata: { event_type: 'create' },
producer_options: { max_retries: 5 }
).send!
This will send a message to test_topic in Kafka in the following format:
{
'event_type' => 'create',
'timestamp' => '2015-07-14T10:10:00.000+08:00',
'blob' => {
'message_text' => 'test'
}
}.to_json
As seen above timestamp will be added automatically to the main attributes along with the message metadata. The actual message will be encapsulated inside a key called blob.
9. Encoding messages
pheromone allows encoding messages using custom encoders. Encoders can be specified by simply specifying a proc to encode the message in any given format.
publish takes the encoder and message_format as options for encoding to be specified. Encoder is called with the entire message object and performs encoding on this message.
publish(
[
{
topic: :test_topic,
event_types: [:update, :create],
serializer: TestSerializer,
message_format: :with_encoding,
encoder: ->() { avro.encode(, schema_name: 'test') },
if: ->(object) { object.should_publish_status_update? }
}
]
)
Message Contents
In versions before < 0.5, the metadata keys were present at the top-most level, and the message itself was embedded inside the key blob.
The message published to Kafka looks like this:
{
event: 'create',
entity: 'KlassName',
timestamp: '2015-07-14T02:10:00.000Z',
blob: {
message_contents: 'message_contents'
}
}
From 0.5 version onwards, the message format looks like this:
{
metadata: {
event: 'create',
entity: 'KlassName',
timestamp: '2015-07-14T02:10:00.000Z'
}
blob: {
message_contents: 'message_contents'
}
}
event, entity, and timestamp are determined and added by pheromone. timestamp will be in UTC by default and will use timezone_format value specified in config/initializers/pheromone.rb. Contents of metadata can be modified by using these guidelines. Message contents are placed under the key blob.
In order to use the format with blob embedded inside metadata, specify option embed_blob as true inside publish options like this:
publish(
[
{
topic: :test_topic,
event_types: [:update, :create],
serializer: TestSerializer,
message_format: :with_encoding,
embed_blob: true
encoder: ->(message) { avro.encode(message, schema_name: 'test') },
if: ->(object) { object.should_publish_status_update? }
}
]
)
Testing
RSpec
pheromone makes it easy to control when it is enabled during testing with pheromone/frameworks/rspec
# spec/rails_helper.rb
require 'rspec/rails'
# ...
require 'pheromone/frameworks/rspec'
With this helper, publishable is disabled by default. To turn it on, pass publishable: true to the spec block like this:
describe 'PublishableModel' do
before do
@invocation_count = 0
allow(WaterDrop::SyncProducer).to receive(:call) do
@invocation_count += 1
end
it 'sends messages to kafka', publishable: true do
PublishableModel.create
expect(@invocation_count).to eq(1)
end
end
Alternatively, Pheromone.enabled? can be used to check if pheromone is enabled
Development
After checking out the repo, run bin/setup to install dependencies. You can also run bin/console for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.
Contributing
This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.
License
The gem is available as open source under the terms of the MIT License.
=======