Module: Deimos::ProducerMiddleware
- Defined in:
- lib/deimos/ext/producer_middleware.rb
Class Method Summary collapse
- ._assign_message(deimos_message, message) ⇒ Object
- ._encode_key(key, config) ⇒ String|Object
- ._process_message(message, karafka_message, config) ⇒ Object
- ._retrieve_key(payload, key_transcoder) ⇒ String
- .allowed_classes ⇒ Object
- .call(message) ⇒ Object
- .validate_key_config(config, message) ⇒ Object
Class Method Details
._assign_message(deimos_message, message) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/deimos/ext/producer_middleware.rb', line 43 def (, ) [:payload] = .encoded_payload [:label] = { original_payload: .payload, original_key: .key } [:key] = .encoded_key [:partition_key] = if .partition_key .partition_key.to_s elsif .key .key.to_s else nil end end |
._encode_key(key, config) ⇒ String|Object
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/deimos/ext/producer_middleware.rb', line 93 def _encode_key(key, config) return nil if key.nil? if config.deserializers[:key].respond_to?(:encode_key) config.deserializers[:key].encode_key(key) elsif key config.deserializers[:payload].encode(key) else key end end |
._process_message(message, karafka_message, config) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/deimos/ext/producer_middleware.rb', line 68 def (, , config) encoder = config.deserializers[:payload].backend key_transcoder = config.deserializers[:key] # this violates the Law of Demeter but it has to happen in a very # specific order and requires a bunch of methods on the producer # to work correctly. .add_fields(encoder.schema_fields.map(&:name)) .key = [:key] || _retrieve_key(.payload, key_transcoder) # need to do this before _coerce_fields because that might result # in an empty payload which is an *error* whereas this is intended. .payload = nil if .payload.blank? .coerce_fields(encoder) .encoded_key = _encode_key(.key, config) .topic = config.name .encoded_payload = if .payload.nil? nil else encoder.encode(.payload, topic: "#{Deimos.config.producers.topic_prefix}#{config.name}-value") end end |
._retrieve_key(payload, key_transcoder) ⇒ String
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/deimos/ext/producer_middleware.rb', line 108 def _retrieve_key(payload, key_transcoder) key = payload.try(:delete, :payload_key) return key if key || !key_transcoder.respond_to?(:key_field) if key_transcoder.key_field key = key_transcoder.key_field.to_s.split('.') current = payload key.each do |k| current = current[k] if current end current else nil end end |
.allowed_classes ⇒ Object
7 8 9 10 11 12 13 |
# File 'lib/deimos/ext/producer_middleware.rb', line 7 def allowed_classes arr = [Hash, SchemaClass::Record] if defined?(Google::Protobuf) arr.push(Google::Protobuf.const_get(:AbstractMessage)) end @allowed_classes ||= arr.freeze end |
.call(message) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/deimos/ext/producer_middleware.rb', line 15 def call() Karafka.monitor.instrument( 'deimos.encode_message', producer: self, message: ) do return if .delete(:already_encoded) config = Deimos.karafka_config_for(topic: [:topic]) return if config.nil? || config.schema.nil? return if [:payload] && self.allowed_classes.none? { |k| [:payload].is_a?(k) } payload = [:payload] payload = payload.to_h if payload.nil? || payload.is_a?(SchemaClass::Record) m = Deimos::Message.new(payload, headers: [:headers], partition_key: [:partition_key]) (m, , config) (m, ) [:topic] = "#{Deimos.config.producers.topic_prefix}#{config.name}" validate_key_config(config, ) end end |
.validate_key_config(config, message) ⇒ Object
59 60 61 62 63 |
# File 'lib/deimos/ext/producer_middleware.rb', line 59 def validate_key_config(config, ) if [:key].nil? && config.deserializers[:key].is_a?(Deimos::Transcoder) raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.' end end |