Class: ROM::Kafka::Gateway

Inherits:
Gateway
  • Object
show all
Extended by:
AttributesDSL
Defined in:
lib/rom/kafka/gateway.rb

Overview

Describes the gateway to Kafka

The gateway has 3 responsibilities:

  • registers the datasets describing various topics and partitions

  • instantiates the producer connection to Kafka brokers that doesn’t depend on a specific topic/partition settings

  • stores settings for the consumer connections to Kafka, that depends on a specific topic/partition/offset

Every dataset uses the same producer connection (defined by gateway) and individual consumer’s one. The consumer’s connection is reloaded every time the topic, partition or current offset is changed by a relation.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*addresses) ⇒ Gateway

Initializes the gateway to Kafka broker(s).

The initializer is attributes-agnostic. This means it doesn’t validate attributes, but skips unused.

Examples:

Initialize a producer’s gateway to Kafka

gateway = Gateway.new(
  hosts: ["127.0.0.1", "127.0.0.2:9093"],
  port: 9092,
  client_id: :my_user,
  compression-codec: :gzip
)
gateway.brokers # => ["127.0.0.1:9092", "127.0.0.2:9093"]

Alternative syntax

gateway = Gateway.new(
  "127.0.0.1:9092",
  "127.0.0.2:9093",
  client_id: :my_user,
  compression-codec: :gzip
)
gateway.brokers # => ["127.0.0.1:9092", "127.0.0.2:9093"]

Mixed syntax

gateway = Gateway.new(
  "127.0.0.1:9092",
  hosts: ["127.0.0.2"]
  port: 9093,
  client_id: :my_user,
  min_bytes: 1024 # wait until 1Kb of messages is prepared
)
gateway.brokers # => ["127.0.0.1:9092", "127.0.0.2:9093"]

Parameters:

  • addresses (nil, String, Array<String>)

    The address(es) of broker(s) to connect (optional). Brokers can be alternatively set with ‘:hosts` and `:port` options.

  • options (Hash)

    a customizable set of options



112
113
114
115
116
117
118
119
# File 'lib/rom/kafka/gateway.rb', line 112

def initialize(*addresses) # @todo: refactor the fat initializer
  options = Hash[addresses.pop]
  brokers = Brokers.new(addresses, options).to_a   # @todo: refactor using a factory
  super options.merge(brokers: brokers) # prepares #attributes

  @producer = Connection::Producer.new(attributes) # @todo: refactor using a factory
  @datasets = {}
end

Instance Attribute Details

#producerROM::Kafka::Producer (readonly)

Returns the producer’s connection to Kafka brockers.

Returns:

  • (ROM::Kafka::Producer)

    the producer’s connection to Kafka brockers



125
126
127
# File 'lib/rom/kafka/gateway.rb', line 125

def producer
  @producer
end

Instance Method Details

#[](topic) ⇒ ROM::Kafka::Dataset

Returns the registered dataset by topic

Parameters:

  • topic (#to_sym)

Returns:



133
134
135
# File 'lib/rom/kafka/gateway.rb', line 133

def [](topic)
  @datasets[topic.to_sym]
end

#dataset(topic) ⇒ self

Registers the dataset by topic

By default the dataset is registered with 0 partition and 0 offset. That settings can be changed from either relation of a command.

Parameters:

  • topic (#to_sym)

Returns:

  • (self)

    itself



146
147
148
# File 'lib/rom/kafka/gateway.rb', line 146

def dataset(topic)
  @datasets[topic.to_sym] ||= Dataset.new(self, topic)
end

#dataset?(topic) ⇒ Boolean

Checks whether a dataset is registered by topic

Parameters:

  • topic (#to_sym)

Returns:

  • (Boolean)


156
157
158
# File 'lib/rom/kafka/gateway.rb', line 156

def dataset?(topic)
  self[topic] ? true : false
end