Class: ROM::Kafka::Connection::Producer Private

Inherits:
ROM::Kafka::Connection show all
Defined in:
lib/rom/kafka/connection/producer.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The producer-specific connection to Kafka cluster

It is wrapped around ‘Poseidon::Producer` driver, and responsible for adopting poseidon API to ROM::Gateway via [#initializer] and [#publish] methods.

ROM::Kafka producer deals with tuples, hiding poseidon-specific implementation of messages from the rest of the gem.

Constant Summary collapse

DRIVER =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

The ‘poseidon’ class describing a producer

Returns:

  • (Class)
Poseidon::Producer
MESSAGE =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

The ‘poseidon’ class describing a message acceptable by producer

Returns:

  • (Class)
Poseidon::MessageToSend

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Producer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes a producer connection

The initializer is attributes-agnostic. This means it doesn’t validate attributes, but skips unused.

Parameters:

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :client_id (#to_s)

    A required unique id used to indentify the Kafka client.

  • :brokers (Array<String>)

    A list of seed brokers to find a lead broker to fetch messages from.

  • :partitioner (Proc, nil)

    A proc used to provide partition from given key.

  • :compression_codec (:gzip, :snappy, nil) — default: nil

    Type of compression to be used.

  • :metadata_refresh_interval_ms (Integer) — default: 600_000

    How frequently the topic metadata should be updated (in milliseconds).

  • :max_send_retries (Integer) — default: 3

    Number of times to retry sending of messages to a leader.

  • :retry_backoff_ms (Integer) — default: 100

    An amount of time (in milliseconds) to wait before refreshing the metadata after we are unable to send messages.

  • :required_acks (Integer) — default: 0

    A number of acks required per request.

  • :ack_timeout_ms (Integer) — default: 1_500

    How long the producer waits for acks.

  • :socket_timeout_ms (Integer) — default: 10_000

    How long the producer/consumer socket waits for any reply from server.



72
73
74
75
76
77
78
# File 'lib/rom/kafka/connection/producer.rb', line 72

def initialize(options) # @todo: Refactor using factory method Connection.build_producer
  super # takes declared attributes only, skipping brokers and client_id
  brokers     = options.fetch(:brokers)
  client      = options.fetch(:client_id)
  @connection = DRIVER.new(brokers, client, attributes)
  @mutex      = Mutex.new
end

Instance Attribute Details

#connectionROM::Kafka::Connections::Producer::DRIVER (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns driver to Kafka.

Returns:

  • (ROM::Kafka::Connections::Producer::DRIVER)

    driver to Kafka



43
44
45
# File 'lib/rom/kafka/connection/producer.rb', line 43

def connection
  @connection
end

Instance Method Details

#publish(*data) ⇒ Array<Hash{Symbol => String, nil}>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sends tuples to the underlying connection

Stringifies non-empty hash values to conform to ‘poseidon’ API.

Parameters:

  • data (Array<Hash>)

Returns:

  • (Array<Hash{Symbol => String, nil}>)

    The list of published tuples



89
90
91
92
93
94
# File 'lib/rom/kafka/connection/producer.rb', line 89

def publish(*data)
  tuples   = data.flatten.map(&method(:stringify_keys))
  messages = tuples.map(&method(:message))
  @mutex.synchronize { @connection.send_messages messages }
  tuples
end