Module: Deimos::ProducerMiddleware
- Defined in:
- lib/deimos/ext/producer_middleware.rb
Class Method Summary collapse
- ._encode_key(key, config) ⇒ String|Object
- ._process_message(message, karafka_message, config) ⇒ Object
- ._retrieve_key(payload, key_transcoder) ⇒ String
- .allowed_classes ⇒ Object
- .call(message) ⇒ Object
- .validate_key_config(config, message) ⇒ Object
Class Method Details
._encode_key(key, config) ⇒ String|Object
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/deimos/ext/producer_middleware.rb', line 89 def _encode_key(key, config) return nil if key.nil? if config.deserializers[:key].respond_to?(:encode_key) config.deserializers[:key].encode_key(key) elsif key config.deserializers[:payload].encode(key) else key end end |
._process_message(message, karafka_message, config) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/deimos/ext/producer_middleware.rb', line 64 def (, , config) encoder = config.deserializers[:payload].backend key_transcoder = config.deserializers[:key] # this violates the Law of Demeter but it has to happen in a very # specific order and requires a bunch of methods on the producer # to work correctly. .add_fields(encoder.schema_fields.map(&:name)) .key = [:key] || _retrieve_key(.payload, key_transcoder) # need to do this before _coerce_fields because that might result # in an empty payload which is an *error* whereas this is intended. .payload = nil if .payload.blank? .coerce_fields(encoder) .encoded_key = _encode_key(.key, config) .topic = config.name .encoded_payload = if .payload.nil? nil else encoder.encode(.payload, topic: "#{Deimos.config.producers.topic_prefix}#{config.name}-value") end end |
._retrieve_key(payload, key_transcoder) ⇒ String
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/deimos/ext/producer_middleware.rb', line 104 def _retrieve_key(payload, key_transcoder) key = payload.try(:delete, :payload_key) return key if key || !key_transcoder.respond_to?(:key_field) if key_transcoder.key_field key = key_transcoder.key_field.to_s.split('.') current = payload key.each do |k| current = current[k] if current end current else nil end end |
.allowed_classes ⇒ Object
7 8 9 10 11 12 13 |
# File 'lib/deimos/ext/producer_middleware.rb', line 7 def allowed_classes arr = [Hash, SchemaClass::Record] if defined?(Google::Protobuf) arr.push(Google::Protobuf.const_get(:AbstractMessage)) end @allowed_classes ||= arr.freeze end |
.call(message) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/deimos/ext/producer_middleware.rb', line 15 def call() Karafka.monitor.instrument( 'deimos.encode_message', producer: self, message: ) do return if .delete(:already_encoded) config = Deimos.karafka_config_for(topic: [:topic]) return if config.nil? || config.schema.nil? return if [:payload] && self.allowed_classes.none? { |k| [:payload].is_a?(k) } payload = [:payload] payload = payload.to_h if payload.nil? || payload.is_a?(SchemaClass::Record) m = Deimos::Message.new(payload, headers: [:headers], partition_key: [:partition_key]) (m, , config) [:payload] = m.encoded_payload [:label] = { original_payload: m.payload, original_key: m.key } [:key] = m.encoded_key [:partition_key] = if m.partition_key m.partition_key.to_s elsif m.key m.key.to_s else nil end [:topic] = "#{Deimos.config.producers.topic_prefix}#{config.name}" validate_key_config(config, ) end end |
.validate_key_config(config, message) ⇒ Object
55 56 57 58 59 |
# File 'lib/deimos/ext/producer_middleware.rb', line 55 def validate_key_config(config, ) if [:key].nil? && config.deserializers[:key].is_a?(Deimos::Transcoder) raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.' end end |