Module: Rimless::KafkaHelpers

Extended by:
ActiveSupport::Concern
Included in:
Rimless
Defined in:
lib/rimless/kafka_helpers.rb

Overview

The top-level Apache Kafka helpers.

Class Method Summary collapse

Class Method Details

.async_message(data:, schema:, topic:, **args) ⇒ Object

Send a single message to Apache Kafka. The data is encoded according to the given Apache Avro schema. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is an asynchronous, non-blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • schema (String, Symbol)

    the Apache Avro schema to use

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



68
69
70
71
# File 'lib/rimless/kafka_helpers.rb', line 68

def async_message(data:, schema:, topic:, **args)
  encoded = Rimless.encode(data, schema: schema)
  async_raw_message(data: encoded, topic: topic, **args)
end

.async_raw_message(data:, topic:, **args) ⇒ Object

Send a single message to Apache Kafka. The data is not touched, so you need to encode it yourself before you pass it in. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is an asynchronous, non-blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



97
98
99
100
# File 'lib/rimless/kafka_helpers.rb', line 97

def async_raw_message(data:, topic:, **args)
  args = args.merge(topic: topic(topic))
  WaterDrop::AsyncProducer.call(data, **args)
end

.sync_message(data:, schema:, topic:, **args) ⇒ Object Also known as: message

Send a single message to Apache Kafka. The data is encoded according to the given Apache Avro schema. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is a synchronous, blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • schema (String, Symbol)

    the Apache Avro schema to use

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



52
53
54
55
# File 'lib/rimless/kafka_helpers.rb', line 52

def sync_message(data:, schema:, topic:, **args)
  encoded = Rimless.encode(data, schema: schema)
  sync_raw_message(data: encoded, topic: topic, **args)
end

.sync_raw_message(data:, topic:, **args) ⇒ Object Also known as: raw_message

Send a single message to Apache Kafka. The data is not touched, so you need to encode it yourself before you pass it in. The destination Kafka topic may be a relative name, or a hash which is passed to the .topic method to manipulate the application details. The message is send is a synchronous, blocking way.

Parameters:

  • data (Hash{Symbol => Mixed})

    the raw data, unencoded

  • topic (String, Symbol, Hash{Symbol => Mixed})

    the destination Apache Kafka topic



82
83
84
85
# File 'lib/rimless/kafka_helpers.rb', line 82

def sync_raw_message(data:, topic:, **args)
  args = args.merge(topic: topic(topic))
  WaterDrop::SyncProducer.call(data, **args)
end

.topic(*args) ⇒ String

Generate a common topic name for Apache Kafka while taking care of configured prefixes.

rubocop:disable Metrics/AbcSize because of the usage flexibility

Examples:

Name only

Rimless.topic(:users)

Name with app

Rimless.topic(:users, app: 'test-api')

Mix and match

Rimless.topic(name: 'test', app: :fancy_app)

Parameters:

  • args (Array<Mixed>)

    the relative topic name

Returns:

  • (String)

    the complete topic name

Raises:

  • (ArgumentError)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rimless/kafka_helpers.rb', line 24

def topic(*args)
  opts = args.last
  name = args.first if [String, Symbol].member?(args.first.class)

  if opts.is_a?(Hash)
    name = opts[:name] if opts.key?(:name)
    app = opts[:app] if opts.key?(:app)
  end

  name ||= nil
  app ||= Rimless.configuration.app_name

  raise ArgumentError, 'No name given' if name.nil?

  "#{Rimless.topic_prefix(app)}#{name}".tr('_', '-')
end