YAKC: Yet another Kafka (0.8) consumer

YAKC is a generic Kavka 0.8 consumer based on the now-dead Poseidon (i know, i know). It will listen to as many topics as you specify and hand them off via a handler to consumer classes.


Add this line to your application's Gemfile:

gem 'yakc'

And then execute:

$ bundle

Or install it yourself as:

$ gem install yakc


There are 2 main componets:

Message Handler

This is the bit of code that handles what to do with the messages once they are received. There are 2 stages to this process:

  1. The message is parsed using your message parser(inherited from the YAKC::Message class) that does the parsing and validity checking.
  2. The parsed message payload is broadcast using ActiveSupport::Notifications 3: It will broadcast the message with the key: "topic::event"

To set it up:

  handler = YAKC::MessageBroadcaster.new message_parser: MyMessageClass

And now you're ready to init the reader

Message Interface

The message parser needs to implement:

  1. parse( raw_message ) : This converts the raw Kafka data to the format of your choice
  2. broadcastable? : This determines whether the message is valid and shoud be broadcast. 3: event : The name of the picked up event. This is the name that gets broadcast

For example if your messages are encoded in Avro and look loosely like:

{ "event": {"name":"myEventName",
  // etc

Your message parser class would look something like

class AvroMessage < YAKC::Message

  def broadcastable?
    # an event is probably okay to transmit if we can extract its name

  def event
    @payload["event"] || {}


  def parse( message )
    data = StringIO.new(message.value)
    msg = Avro::DataFile::Reader.new(data, Avro::IO::DatumReader.new)

  rescue Avro::DataFile::DataFileError => e
    Rails.logger.error e


The reader does(surprise) the reading and pushes the read rad messages to the handler, which you have to specify.

It implements:

  • read : an infinite loop that consumes messages on all the specified topics (see setup below) and sends them to the handler

Here's how you would use it:

  handler = YAKC::MessageBroadcaster.new message_parser: AvroMessage
  reader = YAKC::Reader.new message_handler: handler



And now for the full setup. You will need to specify the zookeepers; the Kafka brokers; the app and suffix, which are used to generate the consumer group name; the topic list; and a logger.

There are 2 ways of doing this. You can either set those up as ENV vars ("ZOOKEEPERS"(comma separated list), "BROKERS"(comma separated list), "APP", "SUFFIX", "TOPICS") and set up the logger by hand, or, you can do it in ruby, like:

  YAKC.configure do |config|
    config.logger     = Rails.logger
    config.zookeepers = ["localhost:9092"]
    config.brokers    = ["localhost:2181"]
    config.app        = "MyApp"
    config.suffix     = Rails.env
    config.topics     = ["clickstream", "logs", "exceptions"] # whatever you're listening for


Here's what a full experience would look like:

The reader would look like

  # in your initializer
  YAKC.configure do |config|
    # we'll assume the rest are set up in the env
    config.logger     = Rails.logger

In your reader job

  handler = YAKC::MessageBroadcaster.new message_parser: AvroMessage
  reader = YAKC::Reader.new message_handler: handler


And the consumers would listen to the events

Let's say you have an app that listens to exceptions that we pass around in kafka. It then stores them in a DB and passesthem off to Honeybadger. Your Exception model could do something like

class Exception < ActiveRecord::Base
  include Yeller::Subscribable

  # we don't care about the event type, so we subscribe to "exception::.*"
  subscribe with: :from_kafka_event, to: "exception::.*"

  def self.from_kafka_event( message )
    create message
    Honeybadger.notify message


After checking out the repo, run bin/setup to install dependencies. Then, run rake test to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.


Bug reports and pull requests are welcome on GitHub at https://github.com/gaorlov/yakc.


The gem is available as open source under the terms of the MIT License.