KafkaRestClient Circle CI

A Ruby client to interact with Kafka REST Proxy

Current Version: 0.4.0

Supported Ruby versions: 2.0, 2.1, 2.2

Installation

Add this line to your application's Gemfile:

gem 'kafka_rest_client', git: '[email protected]:FundingCircle/kafka_rest_client.git', tag: 'v0.3.0'

Usage

Configuration

# Configure global settings
# The client is disabled if kafka_proxy_url and schema_registry_url are not set
KafkaRestClient.configure do |config|
  config.kafka_proxy_url = ENV['KAFKA_PROXY_URL']
  config.schema_registry_url = ENV['SCHEMA_REGISTRY_URL']
  config.timeout = 10
  config.logger = Rails.logger
end

Producing events

require 'kafka_rest_client'

producer = KafkaRestClient::AvroProducer.new

# Produce a single event using the topic name and payload
# The schema_id will be fetched from the schema registry by looking up a schema
# with the "#{topic}-value" name
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' })

# Produce an event using the topic name, payload and a partition key
# The partition key should be the name of a field in the message
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, { key: :temperature })

# This would post a request to the REST Proxy e.g. :
# {"id": 1, "temperature": 32, "unit": "celsius"}

# Produce multiple events
# The schema_id will be fetched from the schema registry by looking up a schema
# with the "#{topic}-value" name
producer.produce('ice-cream-melted', [{ temperature: 35, unit: 'celsius' }])

# Produce an event providing the schema id
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, value_schema_id: 1)

# Produce an event providing the full schema as a JSON string
schema = {
  type: 'record',
  name: 'IceCreamMelted',
  fields: [{ name: 'Temperature', type: 'string' }, { name: 'Unit', type: 'string' }]
}
producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' }, value_schema: schema.to_json)

# Produce event objects by relying on #to_json and #as_json when using Rails ActiveSupport
class IceCreamMeltedEvent
  attr_reader :temperature, :unit
  def initialize(temperature, unit)
    @temperature = temperature
    @unit = unit
  end

  def as_json(_options = nil)
    { temperature: temperature, unit: unit }
  end
end

producer.produce('ice-cream-melted', [IceCreamMeltedEvent.new(35, 'celsius')])

# All of the above examples expect events to have union types explicitly defined. 
# For example, if you have a nullable string field, the type annotation needs to be: 
# {"field_name": { "string": "field_value"}}
# You can provide the following option to automatically annotate nullable fields.
# ⚠️ This does not currently support nested documents
producer.produce('ice-cream-melted',
                 { temperature: 35, unit: 'celsius' },
                 annotate_optional_fields: true,
                 value_schema: {"namespace": "com.fundingcircle",
                                "type": "record",
                                "name": "IceCreamMelted",
                                "fields": [{"name": "id", "type": "string"},
                                           {"name": "temperature","type": "int"},
                                           {"name": "unit",
                                            "type": ["null", "string"]
                                           }]
                 })
# This would post a request to the REST Proxy with the correct type annotation, e.g. :
# {"id": 1, "temperature": 32, "unit": { "string": "celsius"}}

# Exception handling
begin
  producer.produce('ice-cream-melted', { temperature: 35, unit: 'celsius' })
rescue KafkaRestClient::SchemaNotFoundError => e
 # Schema has not been registered in the schema registry
rescue KafkaRestClient::TopicNotFoundError => e
 # Topic does not exist in Kafka, create it with confluent tools
rescue Net::ReadTimeout => e
 # Rescue read timeout errors
end

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/fundingcircle/kafka_rest_client.