Class: Octo::KafkaBridge

Inherits:
Object
  • Object
show all
Defined in:
lib/octocore-cassandra/kafka_bridge.rb

Overview

The bridge between Kafka and ruby

Constant Summary collapse

CLIENT_ID =

These are hard wired

ENV['KAFKA_CLIENT_ID']
TOPIC =
ENV['KAFKA_TOPIC']
MAX_BUFFER_SIZE =
20_000
MAX_QUEUE_SIZE =
10_000
DELIVERY_INTERVAL =
1
BROKERS =

Changes as per environment

ENV['KAFKA_BROKERS'].try(:split, ',')

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ KafkaBridge

Returns a new instance of KafkaBridge.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/octocore-cassandra/kafka_bridge.rb', line 20

def initialize(opts = {})
  opts.deep_symbolize_keys!
  @kafka = ::Kafka.new(seed_brokers: opts.fetch(:brokers, BROKERS),
                     client_id: opts.fetch(:client_id, CLIENT_ID)
  )
  @producer = @kafka.async_producer(
    max_buffer_size: opts.fetch(:max_buffer_size, MAX_BUFFER_SIZE),
    max_queue_size: opts.fetch(:max_queue_size, MAX_QUEUE_SIZE),
    delivery_interval: opts.fetch(:delivery_interval, DELIVERY_INTERVAL),
  )
  if opts.has_key?(:topic)
    @topic = opts[:topic]
  else
    @topic = TOPIC
  end
end

Instance Method Details

#push(params) ⇒ Object



37
38
39
# File 'lib/octocore-cassandra/kafka_bridge.rb', line 37

def push(params)
  create_message params
end

#teardownObject



41
42
43
# File 'lib/octocore-cassandra/kafka_bridge.rb', line 41

def teardown
  @producer.shutdown
end