Class: Kafka::KafkaProducer
- Inherits:
-
Object
- Object
- Kafka::KafkaProducer
- Defined in:
- lib/jruby-kafka/kafka-producer.rb
Overview
noinspection JRubyStringImportInspection
Constant Summary collapse
- KAFKA_PRODUCER =
Java::org.apache.kafka.clients.producer.KafkaProducer
- VALIDATIONS =
{ :'required.codecs' => %w[ none gzip snappy lz4 ] }
- REQUIRED =
%w[ bootstrap.servers key.serializer ]
- KNOWN =
%w[ acks batch.size block.on.buffer.full bootstrap.servers buffer.memory client.id compression.type key.serializer linger.ms max.in.flight.requests.per.connection max.request.size metadata.fetch.timeout.ms metadata.max.age.ms metric.reporters metrics.num.samples metrics.sample.window.ms receive.buffer.bytes reconnect.backoff.ms retries retry.backoff.ms send.buffer.bytes timeout.ms value.serializer ]
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
-
#send_method ⇒ Object
readonly
Returns the value of attribute send_method.
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
-
#initialize(opts = {}) ⇒ KafkaProducer
constructor
A new instance of KafkaProducer.
-
#send_msg(topic, partition, key, value) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
Constructor Details
#initialize(opts = {}) ⇒ KafkaProducer
Returns a new instance of KafkaProducer.
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 32 def initialize(opts = {}) @options = opts.reduce({}) do |opts_array, (k, v)| unless v.nil? opts_array[k.to_s.gsub(/_/, '.')] = v end opts_array end validate_arguments @send_method = proc { throw StandardError.new 'Producer is not connected' } end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
30 31 32 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 30 def @options end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
30 31 32 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 30 def producer @producer end |
#send_method ⇒ Object (readonly)
Returns the value of attribute send_method.
30 31 32 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 30 def send_method @send_method end |
Instance Method Details
#close ⇒ Object
53 54 55 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 53 def close @producer.close end |
#connect ⇒ Object
43 44 45 46 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 43 def connect @producer = KAFKA_PRODUCER.new(create_producer_config) @send_method = producer.java_method :send, [ProducerRecord] end |
#send_msg(topic, partition, key, value) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
49 50 51 |
# File 'lib/jruby-kafka/kafka-producer.rb', line 49 def send_msg(topic, partition, key, value) send_method.call(ProducerRecord.new(topic, partition, key, value)) end |