Class: Deimos::Producer
- Inherits:
-
Object
- Object
- Deimos::Producer
- Includes:
- SharedConfig
- Defined in:
- lib/deimos/producer.rb
Overview
Producer to publish messages to a given kafka topic.
Direct Known Subclasses
Constant Summary collapse
- MAX_BATCH_SIZE =
500
Class Method Summary collapse
-
.determine_backend_class(sync = false, force_send = false) ⇒ Class<Deimos::Backends::Base>
rubocop:disable Style/OptionalBooleanParameter.
- .karafka_config ⇒ Object
-
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key).
-
.produce(messages, backend: determine_backend_class) ⇒ Object
Produce a list of messages in WaterDrop message hash format.
-
.produce_batch(backend, batch) ⇒ void
Send a batch to the backend.
-
.publish(payload, topic: self.topic, headers: nil) ⇒ void
Publish the payload to the topic.
-
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void
Publish a list of messages.
- .topic ⇒ Object
Class Method Details
.determine_backend_class(sync = false, force_send = false) ⇒ Class<Deimos::Backends::Base>
rubocop:disable Style/OptionalBooleanParameter
148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/deimos/producer.rb', line 148 def determine_backend_class(sync=false, force_send=false) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end |
.karafka_config ⇒ Object
134 135 136 137 138 |
# File 'lib/deimos/producer.rb', line 134 def karafka_config Deimos.karafka_configs.find do |t| t.producer_classes&.any? { |k| k&.name == self.name } end end |
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key). Will include ‘payload_key` if it is part of the original payload.
73 74 75 |
# File 'lib/deimos/producer.rb', line 73 def partition_key(_payload) nil end |
.produce(messages, backend: determine_backend_class) ⇒ Object
Produce a list of messages in WaterDrop message hash format.
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/deimos/producer.rb', line 89 def produce(, backend: determine_backend_class) return if Deimos.producers_disabled?(self) .each do |m| m[:label] = m m[:partition_key] ||= self.partition_key(m[:payload]) end .in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend, batch) end end |
.produce_batch(backend, batch) ⇒ void
This method returns an undefined value.
Send a batch to the backend.
167 168 169 |
# File 'lib/deimos/producer.rb', line 167 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end |
.publish(payload, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish the payload to the topic.
82 83 84 |
# File 'lib/deimos/producer.rb', line 82 def publish(payload, topic: self.topic, headers: nil) produce([{ payload: payload, topic: topic, headers: headers }]) end |
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) ⇒ void
This method returns an undefined value.
Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/deimos/producer.rb', line 110 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) backend = determine_backend_class(sync, force_send) = Array(payloads).map do |p| payload = p payload = payload.to_h if p.is_a?(SchemaClass::Record) m = { payload: payload, headers: headers, topic: topic, partition_key: self.partition_key(p) } if m.dig(:payload, :key).present? && m.dig(:payload, :message).present? m[:key] = m[:payload][:key] m[:key] = m[:key].to_h if m[:key].nil? || m[:key].is_a?(SchemaClass::Record) m[:payload] = m[:payload][:message] m[:payload] = m[:payload].to_h if m[:payload].nil? || m[:payload].is_a?(SchemaClass::Record) end m end self.produce(, backend: backend) end |
.topic ⇒ Object
140 141 142 |
# File 'lib/deimos/producer.rb', line 140 def topic karafka_config&.name end |