Class: KafkaEventHub::Producer

Inherits:
Config
  • Object
show all
Defined in:
lib/kafka_event_hub/producer.rb

Constant Summary

Constants inherited from Config

Config::POLL_TIMEOUT

Instance Method Summary collapse

Methods inherited from Config

#consumer, #producer, #topic

Constructor Details

#initialize(topic = nil) ⇒ Producer

Returns a new instance of Producer.



5
6
7
# File 'lib/kafka_event_hub/producer.rb', line 5

def initialize(topic = nil)
  super('producer' => true, 'topic' => topic)
end

Instance Method Details

#produce(topic: nil, key:, payload:, partition: nil) ⇒ Object

Produce any message to Kafka



10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/kafka_event_hub/producer.rb', line 10

def produce(topic: nil, key:, payload:, partition: nil)
  real_topic = topic || @topic
  producer   = @kafka.producer
  json       = payload.is_a?(String) ? payload : payload.to_json

  producer.produce(
    topic:     real_topic,
    key:       key,
    payload:   json,
    partition: partition
  ).wait
ensure
  producer&.close
end