Module: Karafka::Backends::Sidekiq

Defined in:
lib/karafka/backends/sidekiq.rb

Overview

Sidekiq backend that schedules stuff to Sidekiq worker for delayed execution

Constant Summary collapse

VERSION =

Karafka Sidekiq backend version

'1.4.7'

Instance Method Summary collapse

Instance Method Details

#processObject

Note:

Each worker needs to have a class #perform_async method that will allow us to pass parameters into it. We always pass topic as a first argument and this request params_batch as a second one (we pass topic to be able to build back the consumer in the worker)

Enqueues the execution of perform method into a worker.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/karafka/backends/sidekiq.rb', line 16

def process
  Karafka.monitor.instrument('backends.sidekiq.process', caller: self) do
    # We add batch metadata only for batch worker
     = if respond_to?(:batch_metadata)
                            # We remove deserializer as it's not safe to convert it to json
                            # and we can rebuild it anyhow based on the routing data in the
                            # worker
                            .to_h
                                          .transform_keys(&:to_s)
                                          .tap { |h| h.delete('deserializer') }
                          end

    topic.worker.perform_async(
      topic.id,
      topic.interchanger.encode(params_batch),
      
    )
  end
end