KafkaRestClient

A Ruby client to interact with Kafka REST Proxy

Current Version: 0.2.1

Supported Ruby versions: 2.0, 2.1, 2.2

Installation

Add this line to your application's Gemfile:

gem 'kafka_rest_client', git: '[email protected]:FundingCircle/kafka_rest_client.git', tag: 'v0.2.1'

Usage

Configuration

# Configure global settings
# The client is disabled if kafka_proxy_url and schema_registry_url are not set
KafkaRestClient.configure do |config|
  config.kafka_proxy_url = ENV['KAFKA_PROXY_URL']
  config.schema_registry_url = ENV['SCHEMA_REGISTRY_URL']
  config.timeout = 10
  config.logger = Rails.logger
end

Producing events

require 'kafka_rest_client'

producer = KafkaRestClient::AvroProducer.new

# Produce a single event using the topic name and payload
# The schema_id will be fetched from the schema registry by looking up a schema
# with the "#{topic}-value" name
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' })

# Produce multiple events
# The schema_id will be fetched from the schema registry by looking up a schema
# with the "#{topic}-value" name
producer.produce('ice-cream-melted', [{ temperature: 35, unit: 'celsius' }])

# Produce an event providing the schema id
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, value_schema_id: 1)

# Produce an event providing the full schema as a JSON string
schema = {
  type: 'record',
  name: 'IceCreamMelted',
  fields: [{ name: 'Temperature', type: 'string' }, { name: 'Unit', type: 'string' }]
}
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, value_schema: schema.to_json)

# Produce event objects by relying on #to_json and #as_json when using Rails ActiveSupport
class IceCreamMeltedEvent
  attr_reader :temperature, :unit
  def initialize(temperature, unit)
    @temperature = temperature
    @unit = unit
  end

  def as_json(_options = nil)
    { temperature: temperature, unit: unit }
  end
end

producer.produce('ice-cream-melted', [IceCreamMeltedEvent.new(35, 'celsius')])

# Exception handling
begin
  producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' })
rescue KafkaRestClient::SchemaNotFoundError => e
 # Schema has not been registered in the schema registry
rescue KafkaRestClient::TopicNotFoundError => e
 # Topic does not exist in Kafka, create it with confluent tools
rescue Net::ReadTimeout => e
 # Rescue read timeout errors
end

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/fundingcircle/kafka_rest_client.