Class: Karafka::BaseResponder

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/base_responder.rb

Overview

Base responder from which all Karafka responders should inherit Similar to Rails responders concept. It allows us to design flow from one app to another by isolating what responses should be sent (and where) based on a given action It differs from Rails responders in the way it works: in std http request we can have one response, here we can have unlimited number of them

It has a simple API for defining where should we respond (and if it is required)

Examples:

Basic usage (each registered topic is required to be used by default)

class Responder < BaseResponder
  topic :new_action

  def respond(data)
    respond_to :new_action, data
  end
end

Responding to a topic with extra options

class Responder < BaseResponder
  topic :new_action

  def respond(data)
    respond_to :new_action, data, partition_key: 'thing'
  end
end

Marking topic as not required (we won’t have to use it)

class Responder < BaseResponder
  topic :required_topic
  topic :new_action, required: false

  def respond(data)
    respond_to :required_topic, data
  end
end

Multiple times used topic

class Responder < BaseResponder
  topic :required_topic

  def respond(data)
    data.each do |subset|
      respond_to :required_topic, subset
    end
  end
end

Specify serializer for a topic

class Responder < BaseResponder
  topic :xml_topic, serializer: MyXMLSerializer

  def respond(data)
    data.each do |subset|
      respond_to :xml_topic, subset
    end
  end
end

Accept multiple arguments to a respond method

class Responder < BaseResponder
  topic :users_actions
  topic :articles_viewed

  def respond(user, article)
    respond_to :users_actions, user
    respond_to :articles_viewed, article
  end
end

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeKarafka::BaseResponder

Creates a responder object



115
116
117
# File 'lib/karafka/base_responder.rb', line 115

def initialize
  @messages_buffer = {}
end

Class Attribute Details

.options_contractObject

Contract that we can use to control and/or require some additional details upon options that are being passed to the producer. This can be in particular useful if we want to make sure that for example partition_key is always present.



84
85
86
# File 'lib/karafka/base_responder.rb', line 84

def options_contract
  @options_contract
end

.topicsObject

Definitions of all topics that we want to be able to use in this responder should go here



80
81
82
# File 'lib/karafka/base_responder.rb', line 80

def topics
  @topics
end

Instance Attribute Details

#messages_bufferObject (readonly)

Returns the value of attribute messages_buffer.



111
112
113
# File 'lib/karafka/base_responder.rb', line 111

def messages_buffer
  @messages_buffer
end

Class Method Details

.call(*data) ⇒ Object

A simple alias for easier standalone responder usage. Instead of building it with new.call it allows (in case of using JSON serializer) to just run it directly from the class level

Examples:

Send user data with a responder

UsersCreatedResponder.call(@created_user)

Parameters:

  • data

    Anything that we want to respond with



103
104
105
106
107
108
# File 'lib/karafka/base_responder.rb', line 103

def call(*data)
  # Just in case there were no topics defined for a responder, we initialize with
  # empty hash not to handle a nil case
  self.topics ||= {}
  new.call(*data)
end

.topic(topic_name, options = {}) ⇒ Object

Registers a topic as on to which we will be able to respond

Parameters:

  • topic_name (Symbol, String)

    name of topic to which we want to respond

  • options (Hash) (defaults to: {})

    hash with optional configuration details



89
90
91
92
93
94
95
# File 'lib/karafka/base_responder.rb', line 89

def topic(topic_name, options = {})
  options[:serializer] ||= Karafka::App.config.serializer
  options[:registered] = true
  self.topics ||= {}
  topic_obj = Responders::Topic.new(topic_name, options)
  self.topics[topic_obj.name] = topic_obj
end

Instance Method Details

#call(*data) ⇒ Object

Note:

We know that validators should be executed also before sending data to topics, however the implementation gets way more complicated then, that’s why we check after everything was sent using responder

Performs respond and validates that all the response requirement were met

Examples:

Send user data with a responder

UsersCreatedResponder.new.call(@created_user)

Send user data with a responder using non default Parser

UsersCreatedResponder.new(MyParser).call(@created_user)

Parameters:

  • data

    Anything that we want to respond with



128
129
130
131
132
133
# File 'lib/karafka/base_responder.rb', line 128

def call(*data)
  respond(*data)
  validate_usage!
  validate_options!
  deliver!
end