Class: Eventsourcer::Brokers::KafkaBroker

Inherits:
Base
  • Object
show all
Defined in:
lib/eventsourcer/brokers/kafka_broker.rb

Class Method Summary collapse

Class Method Details

.publish(table_name:, previous_changes:) ⇒ Object


10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/eventsourcer/brokers/kafka_broker.rb', line 10

def self.publish(table_name:, previous_changes:)
  producer = client.async_producer(max_queue_size: Eventsourcer.configuration.kafka_max_queue_size || 10000)
  begin
    produce_to_kafka(producer, table_name, previous_changes.to_json)
  rescue Kafka::BufferOverflow
    producer.deliver_messages
    produce_to_kafka(producer, table_name, previous_changes.to_json)
  end

  producer.deliver_messages
  producer.shutdown
end