Class: Deimos::Backends::Db

Inherits:
Base
  • Object
show all
Defined in:
lib/deimos/backends/db.rb

Overview

Backend which saves messages to the database instead of immediately sending them.

Class Method Summary collapse

Methods inherited from Base

publish

Class Method Details

.execute(producer_class:, messages:) ⇒ Object

:nodoc:



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/deimos/backends/db.rb', line 12

def execute(producer_class:, messages:)
  records = messages.map do |m|
    message = Deimos::KafkaMessage.new(
      message: m.encoded_payload ? m.encoded_payload.to_s.b : nil,
      topic: m.topic,
      partition_key: partition_key_for(m)
    )
    message.key = m.encoded_key.to_s.b unless producer_class.config[:no_keys]
    message
  end
  Deimos::KafkaMessage.import(records)
  Deimos.config.metrics&.increment(
    'db_producer.insert',
    tags: %W(topic:#{producer_class.topic}),
    by: records.size
  )
end

.partition_key_for(message) ⇒ String

Returns the partition key to use for this message.

Parameters:

Returns:

  • (String)

    the partition key to use for this message



32
33
34
35
36
37
# File 'lib/deimos/backends/db.rb', line 32

def partition_key_for(message)
  return message.partition_key if message.partition_key.present?
  return message.key unless message.key.is_a?(Hash)

  message.key.to_yaml
end