Class: Kafka::Producer

Inherits:
Object
  • Object
show all
Extended by:
Gem::Deprecate
Defined in:
lib/jruby-kafka/producer.rb

Overview

noinspection JRubyStringImportInspection

Constant Summary collapse

KAFKA_PRODUCER =
Java::kafka.javaapi.producer.Producer
VALIDATIONS =
{
  :'request.required.acks' => %w[ 0 1 -1 ],
  :'required.codecs' => [NoCompressionCodec.name, GZIPCompressionCodec.name, SnappyCompressionCodec.name],
  :'producer.type' => %w[ sync async ]
}
REQUIRED =
%w[
  metadata.broker.list
]
KNOWN =

List of all available options extracted from kafka.apache.org/documentation.html#producerconfigs Apr. 27, 2014 If new options are added, they should just work. Please add them to the list so that we can get handy warnings.

%w[
  metadata.broker.list      request.required.acks         request.timeout.ms
  producer.type             serializer.class              key.serializer.class
  partitioner.class         compression.codec             compressed.topics
  message.send.max.retries  retry.backoff.ms              topic.metadata.refresh.interval.ms
  queue.buffering.max.ms    queue.buffering.max.messages  queue.enqueue.timeout.ms
  batch.num.messages        send.buffer.bytes             client.id
  broker.list
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Producer

Create a Kafka Producer

options: metadata_broker_list: [“localhost:9092”] - REQUIRED: a seed list of kafka brokers



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/jruby-kafka/producer.rb', line 46

def initialize(opts = {})
  @options = opts.reduce({}) do |opts_array, (k, v)|
    unless v.nil?
      opts_array[k.to_s.gsub(/_/, '.')] = v
    end
    opts_array
  end
  if options['broker.list']
    options['metadata.broker.list'] = options.delete 'broker.list'
  end
  if options['metadata.broker.list'].is_a? Array
    options['metadata.broker.list'] = options['metadata.broker.list'].join(',')
  end
  if options['compressed.topics'].is_a? Array
    options['compressed.topics'] = options['compressed.topics'].join(',')
  end
  validate_arguments
  @send_method = proc { throw StandardError.new 'Producer is not connected' }
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



40
41
42
# File 'lib/jruby-kafka/producer.rb', line 40

def options
  @options
end

#producerObject (readonly)

Returns the value of attribute producer.



40
41
42
# File 'lib/jruby-kafka/producer.rb', line 40

def producer
  @producer
end

#send_methodObject (readonly)

Returns the value of attribute send_method.



40
41
42
# File 'lib/jruby-kafka/producer.rb', line 40

def send_method
  @send_method
end

Instance Method Details

#closeObject



81
82
83
# File 'lib/jruby-kafka/producer.rb', line 81

def close
  @producer.close
end

#connectObject



66
67
68
69
# File 'lib/jruby-kafka/producer.rb', line 66

def connect
  @producer = KAFKA_PRODUCER.new(create_producer_config)
  @send_method = producer.java_method :send, [KeyedMessage]
end

#send_msg(topic, key, msg) ⇒ Object

throws FailedToSendMessageException or if not connected, StandardError.



72
73
74
# File 'lib/jruby-kafka/producer.rb', line 72

def send_msg(topic, key, msg)
  send_method.call(KeyedMessage.new(topic, key, msg))
end

#sendMsg(topic, key, msg) ⇒ Object



76
77
78
# File 'lib/jruby-kafka/producer.rb', line 76

def sendMsg(topic, key, msg)
  send_msg(topic, key, msg)
end