Class: Karafka::ActiveJob::Deserializer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/active_job/deserializer.rb

Overview

Note:

Despite the name, this class handles both serialization (job to Kafka payload) and deserialization (Kafka message to job). It’s called “Deserializer” to align with Karafka’s naming conventions where message consumption is the primary concern.

Default deserializer for ActiveJob jobs

This class can be inherited and its methods can be overridden to support custom payload formats (e.g., Avro, Protobuf, MessagePack)

Examples:

Wrapping jobs in a custom envelope with metadata

class EnvelopedJobDeserializer < Karafka::ActiveJob::Deserializer
  def serialize(job)
    # Wrap the job in an envelope with additional metadata
    envelope = {
      version: 1,
      produced_at: Time.now.iso8601,
      producer: 'my-app',
      payload: job.serialize
    }
    ::ActiveSupport::JSON.encode(envelope)
  end

  def deserialize(message)
    # Extract the job from the envelope
    envelope = ::ActiveSupport::JSON.decode(message.raw_payload)

    # Could validate envelope version, log metadata, etc.
    raise 'Unsupported version' if envelope['version'] != 1

    # Return the actual job data
    envelope['payload']
  end
end

# Configure in Karafka
Karafka::App.config.internal.active_job.deserializer = EnvelopedJobDeserializer.new

Instance Method Summary collapse

Instance Method Details

#deserialize(message) ⇒ Hash

Deserializes a Kafka message payload into an ActiveJob job hash

Parameters:

Returns:

  • (Hash)

    deserialized job hash



56
57
58
# File 'lib/karafka/active_job/deserializer.rb', line 56

def deserialize(message)
  ::ActiveSupport::JSON.decode(message.raw_payload)
end

#serialize(job) ⇒ String

Serializes an ActiveJob job into a string payload for Kafka

Parameters:

  • job (ActiveJob::Base, #serialize)

    job to serialize. The job must respond to #serialize which returns a Hash of job attributes. When CurrentAttributes are used, this may be a JobWrapper instance instead of the original ::ActiveJob::Base.

Returns:

  • (String)

    serialized job payload



48
49
50
# File 'lib/karafka/active_job/deserializer.rb', line 48

def serialize(job)
  ::ActiveSupport::JSON.encode(job.serialize)
end