Class: Karafka::Pro::ActiveJob::Dispatcher

Inherits:
ActiveJob::Dispatcher show all
Defined in:
lib/karafka/pro/active_job/dispatcher.rb

Overview

Pro dispatcher that sends the ActiveJob job to a proper topic based on the queue name and that allows to inject additional options into the producer, effectively allowing for a much better and more granular control over the dispatch and consumption process.

Instance Method Summary collapse

Instance Method Details

#dispatch(job) ⇒ Object

Parameters:

  • job (ActiveJob::Base)

    job



37
38
39
40
41
42
43
44
45
# File 'lib/karafka/pro/active_job/dispatcher.rb', line 37

def dispatch(job)
  ::Karafka.producer.public_send(
    fetch_option(job, :dispatch_method, DEFAULTS),
    dispatch_details(job).merge!(
      topic: job.queue_name,
      payload: ::ActiveSupport::JSON.encode(serialize_job(job))
    )
  )
end

#dispatch_many(jobs) ⇒ Object

Bulk dispatches multiple jobs using the Rails 7.1+ API

Parameters:

  • jobs (Array<ActiveJob::Base>)

    jobs we want to dispatch



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/karafka/pro/active_job/dispatcher.rb', line 49

def dispatch_many(jobs)
  dispatches = Hash.new { |hash, key| hash[key] = [] }

  jobs.each do |job|
    d_method = fetch_option(job, :dispatch_many_method, DEFAULTS)

    dispatches[d_method] << dispatch_details(job).merge!(
      topic: job.queue_name,
      payload: ::ActiveSupport::JSON.encode(serialize_job(job))
    )
  end

  dispatches.each do |type, messages|
    ::Karafka.producer.public_send(
      type,
      messages
    )
  end
end