Module: Karafka::Params::Dsl::ClassMethods

Defined in:
lib/karafka/params/dsl.rb

Overview

Class methods required by params to work

Instance Method Summary collapse

Instance Method Details

#build(message, parser) ⇒ Karafka::Params::Params

Returns Karafka params object not yet used parser for retrieving data that we’ve got from Kafka.

Examples:

Build params instance from a hash

Karafka::Params::Params.build({ key: 'value' }) #=> params object

Build params instance from a Kafka::FetchedMessage object

Karafka::Params::Params.build(message) #=> params object

Parameters:

  • message (Kafka::FetchedMessage, Hash)

    message that we get out of Kafka in case of building params inside main Karafka process in Karafka::Connection::Consumer, or a hash when we retrieve data that is already parsed

  • parser (Class)

    parser class that we will use to unparse data

Returns:

  • (Karafka::Params::Params)

    Karafka params object not yet used parser for retrieving data that we’ve got from Kafka



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/karafka/params/dsl.rb', line 60

def build(message, parser)
  instance = new
  instance['parser'] = parser

  # Non kafka fetched message can happen when we interchange data with an
  # additional backend
  if message.is_a?(Kafka::FetchedMessage)
    instance.send(
      :merge!,
      'value' => message.value,
      'partition' => message.partition,
      'offset' => message.offset,
      'key' => message.key,
      'create_time' => message.create_time,
      'receive_time' => Time.now,
      # When we get raw messages, they might have a topic, that was modified by a
      # topic mapper. We need to "reverse" this change and map back to the non-modified
      # format, so our internal flow is not corrupted with the mapping
      'topic' => Karafka::App.config.topic_mapper.incoming(message.topic)
    )
  else
    instance.send(:merge!, message)
  end

  instance
end