Class: ROM::Kafka::Dataset Private

Inherits:
Object
  • Object
show all
Extended by:
AttributesDSL
Includes:
Enumerable
Defined in:
lib/rom/kafka/dataset.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The dataset describes Kafka topic

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(gateway, topic, options = {}) ⇒ Dataset

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Initializes the dataset with a gateway and topic name

Attributes are set by default from a gateway. Later you can create new dataset for the same gateway and topic, but with attributes, updated via [#using] method.

Parameters:

  • gateway (ROM::Kafka::Gateway)
  • topic (String)
  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :partition (Integer) — default: 0

    A partition number to fetch messages from

  • :offset (Integer) — default: 0

    An initial offset to start fetching from.

  • :min_bytes (Integer) — default: 1

    A smallest amount of data the server should send. (By default send us data as soon as it is ready).

  • :max_bytes (Integer) — default: 1_048_576

    A maximum number of bytes to fetch by consumer (1MB by default).

  • :max_wait_ms (Integer) — default: 100

    How long to block until the server sends data. NOTE: This is only enforced if min_bytes is > 0.



70
71
72
73
74
75
76
# File 'lib/rom/kafka/dataset.rb', line 70

def initialize(gateway, topic, options = {})
  super gateway.attributes.merge(options)
  @topic    = topic.to_s
  @gateway  = gateway
  @producer = gateway.producer
  @consumer = prepare_consumer # @todo: refactor using a factory
end

Instance Attribute Details

#consumerROM::Kafka::Connection::Consumer (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The consumer connection to Kafka brokers, used to fetch messages via [#each] method call.

Returns:



46
47
48
# File 'lib/rom/kafka/dataset.rb', line 46

def consumer
  @consumer
end

#gatewayROM::Kafka::Gateway (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The back reference to the gateway, that provided the dataset.

Returns:



24
25
26
# File 'lib/rom/kafka/dataset.rb', line 24

def gateway
  @gateway
end

#producerROM::Kafka::Connection::Producer (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The producer connection to Kafka brokers, defined by a gateway. It is stored for being used by a ‘Create` command.

Returns:



38
39
40
# File 'lib/rom/kafka/dataset.rb', line 38

def producer
  @producer
end

#topicString (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The name of the topic, described by the dataset.

Returns:

  • (String)

    The name of the topic, described by the dataset



30
31
32
# File 'lib/rom/kafka/dataset.rb', line 30

def topic
  @topic
end

Instance Method Details

#each(&block) {|tuple| ... } ⇒ Enumerator<Hash{Symbol => String, Integer}>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the enumerator to iterate via tuples, fetched from a [#consumer].

If a ‘limit` of messages is set, iterator stops after achieving it.

Parameters:

  • block (Proc)

Yield Parameters:

  • tuple (Hash)

Returns:

  • (Enumerator<Hash{Symbol => String, Integer}>)


98
99
100
101
# File 'lib/rom/kafka/dataset.rb', line 98

def each(&block)
  return to_enum unless block_given?
  limit.equal?(0) ? unlimited_each(&block) : limited_each(&block)
end

#using(options) ⇒ ROM::Kafka::Dataset

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new dataset with updated consumer attributes

Parameters:

  • options (Hash)

    The part of attributes to be updated

Returns:



84
85
86
# File 'lib/rom/kafka/dataset.rb', line 84

def using(options)
  self.class.new(gateway, topic, attributes.merge(options))
end