Class: Kafka::Producer
- Inherits:
-
Object
- Object
- Kafka::Producer
- Extended by:
- Gem::Deprecate
- Defined in:
- lib/jruby-kafka/producer.rb
Overview
noinspection JRubyStringImportInspection
Constant Summary collapse
- KAFKA_PRODUCER =
Java::kafka.javaapi.producer.Producer
- VALIDATIONS =
{ :'request.required.acks' => %w[ 0 1 -1 ], :'required.codecs' => [NoCompressionCodec.name, GZIPCompressionCodec.name, SnappyCompressionCodec.name], :'producer.type' => %w[ sync async ] }
- REQUIRED =
%w[ metadata.broker.list ]
- KNOWN =
List of all available options extracted from kafka.apache.org/documentation.html#producerconfigs Apr. 27, 2014 If new options are added, they should just work. Please add them to the list so that we can get handy warnings.
%w[ metadata.broker.list request.required.acks request.timeout.ms producer.type serializer.class key.serializer.class partitioner.class compression.codec compressed.topics message.send.max.retries retry.backoff.ms topic.metadata.refresh.interval.ms queue.buffering.max.ms queue.buffering.max.messages queue.enqueue.timeout.ms batch.num.messages send.buffer.bytes client.id broker.list ]
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
-
#send_method ⇒ Object
readonly
Returns the value of attribute send_method.
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
-
#initialize(opts = {}) ⇒ Producer
constructor
Create a Kafka Producer.
-
#send_msg(topic, key, msg) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
- #sendMsg(topic, key, msg) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Producer
Create a Kafka Producer
options: metadata_broker_list: [“localhost:9092”] - REQUIRED: a seed list of kafka brokers
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/jruby-kafka/producer.rb', line 46 def initialize(opts = {}) @options = opts.reduce({}) do |opts_array, (k, v)| unless v.nil? opts_array[k.to_s.gsub(/_/, '.')] = v end opts_array end if ['broker.list'] ['metadata.broker.list'] = .delete 'broker.list' end if ['metadata.broker.list'].is_a? Array ['metadata.broker.list'] = ['metadata.broker.list'].join(',') end if ['compressed.topics'].is_a? Array ['compressed.topics'] = ['compressed.topics'].join(',') end validate_arguments @send_method = proc { throw StandardError.new 'Producer is not connected' } end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
40 41 42 |
# File 'lib/jruby-kafka/producer.rb', line 40 def @options end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
40 41 42 |
# File 'lib/jruby-kafka/producer.rb', line 40 def producer @producer end |
#send_method ⇒ Object (readonly)
Returns the value of attribute send_method.
40 41 42 |
# File 'lib/jruby-kafka/producer.rb', line 40 def send_method @send_method end |
Instance Method Details
#close ⇒ Object
81 82 83 |
# File 'lib/jruby-kafka/producer.rb', line 81 def close @producer.close end |
#connect ⇒ Object
66 67 68 69 |
# File 'lib/jruby-kafka/producer.rb', line 66 def connect @producer = KAFKA_PRODUCER.new(create_producer_config) @send_method = producer.java_method :send, [KeyedMessage] end |
#send_msg(topic, key, msg) ⇒ Object
throws FailedToSendMessageException or if not connected, StandardError.
72 73 74 |
# File 'lib/jruby-kafka/producer.rb', line 72 def send_msg(topic, key, msg) send_method.call(KeyedMessage.new(topic, key, msg)) end |
#sendMsg(topic, key, msg) ⇒ Object
76 77 78 |
# File 'lib/jruby-kafka/producer.rb', line 76 def sendMsg(topic, key, msg) send_msg(topic, key, msg) end |