Class: Karafka::Interchanger
- Inherits:
-
Object
- Object
- Karafka::Interchanger
- Defined in:
- lib/karafka/interchanger.rb
Overview
Interchanger allows us to format/encode/pack data that is being send to perform_async This is meant to target mostly issues with data encoding like this one: github.com/mperham/sidekiq/issues/197 Each custom interchanger should implement following methods:
- encode - it is meant to encode params before they get stored inside Redis
- decode - decoded params back to a hash format that we can use
This interchanger uses default Sidekiq options to exchange data
Instance Method Summary collapse
-
#decode(params_batch) ⇒ Array<Hash>
Exactly what we’ve fetched from Sidekiq.
-
#encode(params_batch) ⇒ Array<Hash>
Array with hash built out of params data.
Instance Method Details
#decode(params_batch) ⇒ Array<Hash>
Returns exactly what we’ve fetched from Sidekiq.
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/karafka/interchanger.rb', line 36 def decode(params_batch) params_batch.map do |param| = param['metadata'] # Covert serialized dates back to what they were ['receive_time'] = Time.at(['receive_time'].to_f).to_time ['create_time'] = Time.at(['create_time'].to_f).to_time param['metadata'] = param end end |
#encode(params_batch) ⇒ Array<Hash>
Returns Array with hash built out of params data.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/karafka/interchanger.rb', line 15 def encode(params_batch) params_batch.map do |param| = param..to_h # All the metadata must have stringified keys in order to safe serialize .transform_keys!(&:to_s) # This will be taken back from the routing and is not safe for serialization .delete('deserializer') # Cast times to strings, we will de-serialize it back in Sidekiq ['receive_time'] = ['receive_time'].to_f.to_s ['create_time'] = ['create_time'].to_f.to_s { 'raw_payload' => param.raw_payload, 'metadata' => } end end |