Class: Hermann::Provider::JavaProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/provider/java_producer.rb

Overview

This class simulates the kafka producer class within a java environment. If the producer throw an exception within the Promise a call to .value! will raise the exception and the rejected flag will be set to true

Constant Summary collapse

DEFAULTS =

default kafka Producer options

{
  'partitioner.class'     => 'kafka.producer.DefaultPartitioner',
  'request.required.acks' => '1',
  'message.send.max.retries' => '0'
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers, opts = {}) ⇒ JavaProducer

Instantiate JavaProducer

Examples

JavaProducer.new(‘0:9092’, => ‘1’)



34
35
36
37
# File 'lib/hermann/provider/java_producer.rb', line 34

def initialize(brokers, opts={})
  config      = create_config(brokers, opts)
  @producer   = JavaApiUtil::Producer.new(config)
end

Instance Attribute Details

#producerObject

Returns the value of attribute producer.



13
14
15
# File 'lib/hermann/provider/java_producer.rb', line 13

def producer
  @producer
end

Instance Method Details

#connect(timeout = 0) ⇒ Object

No-op for now



73
74
75
# File 'lib/hermann/provider/java_producer.rb', line 73

def connect(timeout=0)
  nil
end

#connected?Boolean

No-op for now



63
64
65
# File 'lib/hermann/provider/java_producer.rb', line 63

def connected?
  return false
end

#errored?Boolean

No-op for now



68
69
70
# File 'lib/hermann/provider/java_producer.rb', line 68

def errored?
  return false
end

#push_single(msg, topic, key) ⇒ Object

Push a value onto the Kafka topic passed to this Producer



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/hermann/provider/java_producer.rb', line 47

def push_single(msg, topic, key)
  Concurrent::Promise.execute {
    data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
    begin
      @producer.send(data)
    rescue Java::KafkaCommon::FailedToSendMessageException => jexc
      raise Hermann::Errors::ConnectivityError.new(jexc.message,
                                                   :java_exception => jexc)
    rescue => e
      raise Hermann::Errors::GeneralError.new(e.message,
                                                   :java_exception => e)
    end
  }
end