Class: Karafka::Interchanger

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/interchanger.rb

Overview

Interchanger allows us to format/encode/pack data that is being send to perform_async This is meant to target mostly issues with data encoding like this one: github.com/mperham/sidekiq/issues/197 Each custom interchanger should implement following methods:

- encode - it is meant to encode params before they get stored inside Redis
- decode - decoded params back to a hash format that we can use

This interchanger uses default Sidekiq options to exchange data

Instance Method Summary collapse

Instance Method Details

#decode(params_batch) ⇒ Array<Hash>

Returns exactly what we’ve fetched from Sidekiq.

Parameters:

  • params_batch (Array<Hash>)

    Sidekiq params that are now an array

Returns:

  • (Array<Hash>)

    exactly what we’ve fetched from Sidekiq



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/interchanger.rb', line 36

def decode(params_batch)
  params_batch.map do |param|
     = param['metadata']
    # Covert serialized dates back to what they were
    ['receive_time'] = Time.at(['receive_time'].to_f).to_time
    ['create_time'] = Time.at(['create_time'].to_f).to_time

    param['metadata'] = 

    param
  end
end

#encode(params_batch) ⇒ Array<Hash>

Returns Array with hash built out of params data.

Parameters:

  • params_batch (Karafka::Params::ParamsBatch)

    Karafka params batch object

Returns:

  • (Array<Hash>)

    Array with hash built out of params data



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/karafka/interchanger.rb', line 15

def encode(params_batch)
  params_batch.map do |param|
     = param..to_h
    # All the metadata must have stringified keys in order to safe serialize
    .transform_keys!(&:to_s)
    # This will be taken back from the routing and is not safe for serialization
    .delete('deserializer')

    # Cast times to strings, we will de-serialize it back in Sidekiq
    ['receive_time'] = ['receive_time'].to_f.to_s
    ['create_time'] = ['create_time'].to_f.to_s

    {
      'raw_payload' => param.raw_payload,
      'metadata' => 
    }
  end
end