Class: Kafka::KafkaProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/jruby-kafka/kafka-producer.rb

Overview

noinspection JRubyStringImportInspection

Constant Summary collapse

KAFKA_PRODUCER =
Java::org.apache.kafka.clients.producer.KafkaProducer
VALIDATIONS =
{
    :'required.codecs' => %w[
    none gzip snappy lz4
  ]
}
REQUIRED =
%w[
  bootstrap.servers key.serializer
]
KNOWN =
%w[
  acks                      batch.size                block.on.buffer.full
  bootstrap.servers         buffer.memory             client.id
  compression.type          key.serializer            linger.ms
  max.in.flight.requests.per.connection               max.request.size
  metadata.fetch.timeout.ms metadata.max.age.ms       metric.reporters
  metrics.num.samples       metrics.sample.window.ms  receive.buffer.bytes
  reconnect.backoff.ms      retries                   retry.backoff.ms
  send.buffer.bytes         timeout.ms                value.serializer
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ KafkaProducer

Returns a new instance of KafkaProducer.



32
33
34
35
36
37
38
39
40
41
# File 'lib/jruby-kafka/kafka-producer.rb', line 32

def initialize(opts = {})
  @options = opts.reduce({}) do |opts_array, (k, v)|
    unless v.nil?
      opts_array[k.to_s.gsub(/_/, '.')] = v
    end
    opts_array
  end
  validate_arguments
  @send_method = proc { throw StandardError.new 'Producer is not connected' }
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



30
31
32
# File 'lib/jruby-kafka/kafka-producer.rb', line 30

def options
  @options
end

#producerObject (readonly)

Returns the value of attribute producer.



30
31
32
# File 'lib/jruby-kafka/kafka-producer.rb', line 30

def producer
  @producer
end

#send_methodObject (readonly)

Returns the value of attribute send_method.



30
31
32
# File 'lib/jruby-kafka/kafka-producer.rb', line 30

def send_method
  @send_method
end

Instance Method Details

#closeObject



53
54
55
# File 'lib/jruby-kafka/kafka-producer.rb', line 53

def close
  @producer.close
end

#connectObject



43
44
45
46
# File 'lib/jruby-kafka/kafka-producer.rb', line 43

def connect
  @producer = KAFKA_PRODUCER.new(create_producer_config)
  @send_method = producer.java_method :send, [ProducerRecord]
end

#send_msg(topic, partition, key, value) ⇒ Object

throws FailedToSendMessageException or if not connected, StandardError.



49
50
51
# File 'lib/jruby-kafka/kafka-producer.rb', line 49

def send_msg(topic, partition, key, value)
  send_method.call(ProducerRecord.new(topic, partition, key, value))
end