Class: Deimos::Producer
- Inherits:
-
Object
- Object
- Deimos::Producer
- Includes:
- SharedConfig
- Defined in:
- lib/deimos/producer.rb
Overview
Producer to publish messages to a given kafka topic.
Direct Known Subclasses
Constant Summary collapse
- MAX_BATCH_SIZE =
500
Class Method Summary collapse
- .config ⇒ Hash
-
.determine_backend_class(sync, force_send) ⇒ Class < Deimos::Backend]
Class < Deimos::Backend].
- .encoder ⇒ Deimos::SchemaBackends::Base
- .key_encoder ⇒ Deimos::SchemaBackends::Base
-
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key).
-
.produce_batch(backend, batch) ⇒ Object
Send a batch to the backend.
-
.publish(payload, topic: self.topic) ⇒ Object
Publish the payload to the topic.
-
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic) ⇒ Object
Publish a list of messages.
-
.topic(topic = nil) ⇒ String
Set the topic.
-
.watched_attributes ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates.
Class Method Details
.config ⇒ Hash
61 62 63 64 65 66 |
# File 'lib/deimos/producer.rb', line 61 def config @config ||= { encode_key: true, namespace: Deimos.config.producers.schema_namespace } end |
.determine_backend_class(sync, force_send) ⇒ Class < Deimos::Backend]
Returns Class < Deimos::Backend].
127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/deimos/producer.rb', line 127 def determine_backend_class(sync, force_send) backend = if force_send :kafka else Deimos.config.producers.backend end if backend == :kafka_async && sync backend = :kafka elsif backend == :kafka && sync == false backend = :kafka_async end "Deimos::Backends::#{backend.to_s.classify}".constantize end |
.encoder ⇒ Deimos::SchemaBackends::Base
149 150 151 152 |
# File 'lib/deimos/producer.rb', line 149 def encoder @encoder ||= Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) end |
.key_encoder ⇒ Deimos::SchemaBackends::Base
155 156 157 158 |
# File 'lib/deimos/producer.rb', line 155 def key_encoder @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema], namespace: config[:namespace]) end |
.partition_key(_payload) ⇒ String
Override the default partition key (which is the payload key). Will include ‘payload_key` if it is part of the original payload.
84 85 86 |
# File 'lib/deimos/producer.rb', line 84 def partition_key(_payload) nil end |
.produce_batch(backend, batch) ⇒ Object
Send a batch to the backend.
144 145 146 |
# File 'lib/deimos/producer.rb', line 144 def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end |
.publish(payload, topic: self.topic) ⇒ Object
Publish the payload to the topic.
91 92 93 |
# File 'lib/deimos/producer.rb', line 91 def publish(payload, topic: self.topic) publish_list([payload], topic: topic) end |
.publish_list(payloads, sync: nil, force_send: false, topic: self.topic) ⇒ Object
Publish a list of messages. whether to publish synchronously. and send immediately to Kafka.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/deimos/producer.rb', line 102 def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) raise 'Topic not specified. Please specify the topic.' if topic.blank? backend_class = determine_backend_class(sync, force_send) Deimos.instrument( 'encode_messages', producer: self, topic: topic, payloads: payloads ) do = Array(payloads).map { |p| Deimos::Message.new(p, self) } .each { |m| (m, topic) } .in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end end |
.topic(topic = nil) ⇒ String
Set the topic.
71 72 73 74 75 76 77 78 |
# File 'lib/deimos/producer.rb', line 71 def topic(topic=nil) if topic config[:topic] = topic return end # accessor "#{Deimos.config.producers.topic_prefix}#{config[:topic]}" end |
.watched_attributes ⇒ Array<String>
Override this in active record producers to add non-schema fields to check for updates
163 164 165 |
# File 'lib/deimos/producer.rb', line 163 def watched_attributes self.encoder.schema_fields.map(&:name) end |