Module: Karafka::Testing::RSpec::Helpers

Defined in:
lib/karafka/testing/rspec/helpers.rb

Overview

RSpec helpers module that needs to be included

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object

Adds all the needed extra functionalities to the rspec group

Parameters:

  • base (Class)

    RSpec example group we want to extend



15
16
17
18
19
20
21
22
# File 'lib/karafka/testing/rspec/helpers.rb', line 15

def included(base)
  # This is an internal buffer for keeping "to be sent" messages before
  # we run the consume
  base.let(:_karafka_raw_data) { [] }
  # Clear the messages buffer after each spec, so nothing will leak
  # in between them
  base.after { _karafka_raw_data.clear }
end

Instance Method Details

#karafka_consumer_for(requested_topic) ⇒ Object

Creates a consumer instance for given topic

Examples:

Creates a MyConsumer consumer instance with settings for ‘my_requested_topic`

RSpec.describe MyConsumer do
  subject(:consumer) { karafka_consumer_for(:my_requested_topic) }
end

Parameters:

  • requested_topic (String, Symbol)

    name of the topic for which we want to create a consumer instance

Returns:

  • (Object)

    described_class instance

Raises:



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/karafka/testing/rspec/helpers.rb', line 36

def karafka_consumer_for(requested_topic)
  selected_topic = nil

  # @note Remove in 2.0. This won't work without the global state
  ::Karafka::App.consumer_groups.each do |consumer_group|
    consumer_group.topics.each do |topic|
      selected_topic = topic if topic.name == requested_topic.to_s
    end
  end

  raise Karafka::Testing::Errors::TopicNotFoundError, requested_topic unless selected_topic

  described_class.new(selected_topic)
end

#publish_for_karafka(raw_payload, opts = {}) ⇒ Object

Adds a new Karafka params instance with given payload and options into an internal buffer that will be used to simulate messages delivery to the consumer

Examples:

Send a json message to consumer

before do
  publish_for_karafka({ 'hello' => 'world' }.to_json)
end

Send a json message to consumer and simulate, that it is partition 6

before do
  publish_for_karafka({ 'hello' => 'world' }.to_json, 'partition' => 6)
end

Parameters:

  • raw_payload (String)

    anything you want to send

  • opts (Hash) (defaults to: {})

    additional options with which you want to overwrite the message defaults (key, offset, etc)



67
68
69
70
71
72
73
74
# File 'lib/karafka/testing/rspec/helpers.rb', line 67

def publish_for_karafka(raw_payload, opts = {})
   = Karafka::Params::Metadata.new(
    **.merge(opts)
  ).freeze

  _karafka_raw_data << Karafka::Params::Params.new(raw_payload, )
  subject.params_batch = Karafka::Params::ParamsBatch.new(_karafka_raw_data)
end