SimpleKafkaConsumer
Write Kafka consumers in a model with retry
Installation
Add this line to your application’s Gemfile:
ruby
gem 'simple_kafka_consumer'
And then execute:
$ bundle
Or install it yourself as:
$ gem install simple_kafka_consumer
Usage
You will want to write your own consumer class that inherits from SimpleKafkaConsumer::Consumer. You will want to specify the group_name and topic_name. You’ll also want to define the consume method which is the handler for batch of messages received.
```ruby class MyConsumer < SimpleKafkaConsumer::Consumer # the name used for coordinating multiple consumers self.group_name = “my-group-name”
# the kafka topic we’re reading from self.topic_name = “my-topic-name”
# handle the messages def consume(message) puts message end end ```
Formatting
You can have the consumer handle deserializing your data that is sent in the message. For example, if you used json as your message format:
```ruby class MyConsumer def parse(message) JSON.parse(message) end
# the message you’re consuming is now a parsed json object def consume(json_object) puts json_object[‘name’] end end ```
Creating and Running
To create a consumer instance, you’ll need to provide an array of kafka servers and an array of zookeeper servers. You can optionally provide a logger as well.
```ruby # create a consumer kafka_servers = [“localhost:9092”] zookeeper_servers = [“localhost:2181”] consumer = MyConsumer.new(kafka_servers, zookeeper_servers, logger: nil)
run the consumer (loops and blocks)
consumer.run ```
This gem utilizes the poseidon_cluster gem and consumers coordinate via zookeeper. Thus, you can run many consumers. The group_name is what’s used to determine which messages have already been processed.
Contributing
- Fork it ( https://github.com/chingor13/simple_kafka_consumer/fork )
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create a new Pull Request