Class: Hermann::Provider::JavaProducer

Inherits:
Object
  • Object
show all
Defined in:
lib/hermann/provider/java_producer.rb

Overview

This class simulates the kafka producer class within a java environment. If the producer throw an exception within the Promise a call to .value! will raise the exception and the rejected flag will be set to true

Constant Summary collapse

DEFAULTS =

default kafka Producer options

{
  'partitioner.class'     => 'kafka.producer.DefaultPartitioner',
  'request.required.acks' => '1',
  'message.send.max.retries' => '0'
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers, opts = {}) ⇒ JavaProducer

Instantiate JavaProducer

Examples

JavaProducer.new(‘0:9092’, => ‘1’)



33
34
35
36
# File 'lib/hermann/provider/java_producer.rb', line 33

def initialize(brokers, opts={})
  config      = create_config(brokers, opts)
  @producer   = JavaApiUtil::Producer.new(config)
end

Instance Attribute Details

#producerObject

Returns the value of attribute producer.



12
13
14
# File 'lib/hermann/provider/java_producer.rb', line 12

def producer
  @producer
end

Instance Method Details

#connect(timeout = 0) ⇒ Object

No-op for now



73
74
75
# File 'lib/hermann/provider/java_producer.rb', line 73

def connect(timeout=0)
  nil
end

#connected?Boolean

No-op for now

Returns:

  • (Boolean)


63
64
65
# File 'lib/hermann/provider/java_producer.rb', line 63

def connected?
  return false
end

#errored?Boolean

No-op for now

Returns:

  • (Boolean)


68
69
70
# File 'lib/hermann/provider/java_producer.rb', line 68

def errored?
  return false
end

#push_single(msg, topic, key, _) ⇒ Object

Push a value onto the Kafka topic passed to this Producer

Parameters:

  • value (Object)

    A single object to push

  • topic (String)

    to push message to

Returns:

  • Concurrent::Promise Representa a promise to send the data to the kafka broker. Upon execution the Promise’s status will be set



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/hermann/provider/java_producer.rb', line 46

def push_single(msg, topic, key, _)
  key = key && key.to_java
  Concurrent::Promise.execute {
    data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
    begin
      @producer.send(data)
    rescue Java::KafkaCommon::FailedToSendMessageException => jexc
      raise Hermann::Errors::ConnectivityError.new(jexc.message,
                                                   :java_exception => jexc)
    rescue => e
      raise Hermann::Errors::GeneralError.new(e.message,
                                                   :java_exception => e)
    end
  }
end