Module: Deimos::ProducerMiddleware

Defined in:
lib/deimos/ext/producer_middleware.rb

Class Method Summary collapse

Class Method Details

._assign_message(deimos_message, message) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/deimos/ext/producer_middleware.rb', line 43

def _assign_message(deimos_message, message)
  message[:payload] = deimos_message.encoded_payload
  message[:label] = {
    original_payload: deimos_message.payload,
    original_key: deimos_message.key
  }
  message[:key] = deimos_message.encoded_key
  message[:partition_key] = if deimos_message.partition_key
                              deimos_message.partition_key.to_s
                            elsif deimos_message.key
                              deimos_message.key.to_s
                            else
                              nil
                            end
end

._encode_key(key, config) ⇒ String|Object

Parameters:

  • key (Object)
  • config (ProducerConfig)

Returns:

  • (String|Object)


93
94
95
96
97
98
99
100
101
102
103
# File 'lib/deimos/ext/producer_middleware.rb', line 93

def _encode_key(key, config)
  return nil if key.nil?

  if config.deserializers[:key].respond_to?(:encode_key)
    config.deserializers[:key].encode_key(key)
  elsif key
    config.deserializers[:payload].encode(key)
  else
    key
  end
end

._process_message(message, karafka_message, config) ⇒ Object

Parameters:

  • message (Deimos::Message)
  • karafka_message (Hash)
  • config (Deimos::ProducerConfig)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/deimos/ext/producer_middleware.rb', line 68

def _process_message(message, karafka_message, config)
  encoder = config.deserializers[:payload].backend
  key_transcoder = config.deserializers[:key]
  # this violates the Law of Demeter but it has to happen in a very
  # specific order and requires a bunch of methods on the producer
  # to work correctly.
  message.add_fields(encoder.schema_fields.map(&:name))
  message.key = karafka_message[:key] || _retrieve_key(message.payload, key_transcoder)
  # need to do this before _coerce_fields because that might result
  # in an empty payload which is an *error* whereas this is intended.
  message.payload = nil if message.payload.blank?
  message.coerce_fields(encoder)
  message.encoded_key = _encode_key(message.key, config)
  message.topic = config.name
  message.encoded_payload = if message.payload.nil?
                              nil
                            else
                              encoder.encode(message.payload,
                                             topic: "#{Deimos.config.producers.topic_prefix}#{config.name}-value")
                            end
end

._retrieve_key(payload, key_transcoder) ⇒ String

Parameters:

Returns:

  • (String)


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/deimos/ext/producer_middleware.rb', line 108

def _retrieve_key(payload, key_transcoder)
  key = payload.try(:delete, :payload_key)
  return key if key || !key_transcoder.respond_to?(:key_field)

  if key_transcoder.key_field
    key = key_transcoder.key_field.to_s.split('.')
    current = payload
    key.each do |k|
      current = current[k] if current
    end
    current
  else
    nil
  end
end

.allowed_classesObject



7
8
9
10
11
12
13
# File 'lib/deimos/ext/producer_middleware.rb', line 7

def allowed_classes
  arr = [Hash, SchemaClass::Record]
  if defined?(Google::Protobuf)
    arr.push(Google::Protobuf.const_get(:AbstractMessage))
  end
  @allowed_classes ||= arr.freeze
end

.call(message) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/deimos/ext/producer_middleware.rb', line 15

def call(message)
  Karafka.monitor.instrument(
    'deimos.encode_message',
    producer: self,
    message: message
  ) do
    return message if message.delete(:already_encoded)

    config = Deimos.karafka_config_for(topic: message[:topic])
    return message if config.nil? || config.schema.nil?
    return if message[:payload] &&
              self.allowed_classes.none? { |k| message[:payload].is_a?(k) }

    payload = message[:payload]
    payload = payload.to_h if payload.nil? || payload.is_a?(SchemaClass::Record)
    m = Deimos::Message.new(payload,
                            headers: message[:headers],
                            partition_key: message[:partition_key])
    _process_message(m, message, config)
    _assign_message(m, message)
    message[:topic] = "#{Deimos.config.producers.topic_prefix}#{config.name}"

    validate_key_config(config, message)

    message
  end
end

.validate_key_config(config, message) ⇒ Object



59
60
61
62
63
# File 'lib/deimos/ext/producer_middleware.rb', line 59

def validate_key_config(config, message)
  if message[:key].nil? && config.deserializers[:key].is_a?(Deimos::Transcoder)
     raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.'
  end
end