Module: Karafka::Testing::RSpec::Helpers
- Defined in:
- lib/karafka/testing/rspec/helpers.rb
Overview
RSpec helpers module that needs to be included
Class Method Summary collapse
-
.included(base) ⇒ Object
Adds all the needed extra functionalities to the rspec group.
Instance Method Summary collapse
-
#karafka_consumer_for(requested_topic) ⇒ Object
Creates a consumer instance for given topic.
-
#publish_for_karafka(raw_payload, opts = {}) ⇒ Object
Adds a new Karafka params instance with given payload and options into an internal buffer that will be used to simulate messages delivery to the consumer.
Class Method Details
.included(base) ⇒ Object
Adds all the needed extra functionalities to the rspec group
15 16 17 18 19 20 21 22 |
# File 'lib/karafka/testing/rspec/helpers.rb', line 15 def included(base) # This is an internal buffer for keeping "to be sent" messages before # we run the consume base.let(:_karafka_raw_data) { [] } # Clear the messages buffer after each spec, so nothing will leak # in between them base.after { _karafka_raw_data.clear } end |
Instance Method Details
#karafka_consumer_for(requested_topic) ⇒ Object
Creates a consumer instance for given topic
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/karafka/testing/rspec/helpers.rb', line 36 def karafka_consumer_for(requested_topic) selected_topic = nil # @note Remove in 2.0. This won't work without the global state ::Karafka::App.consumer_groups.each do |consumer_group| consumer_group.topics.each do |topic| selected_topic = topic if topic.name == requested_topic.to_s end end raise Karafka::Testing::Errors::TopicNotFoundError, requested_topic unless selected_topic described_class.new(selected_topic) end |
#publish_for_karafka(raw_payload, opts = {}) ⇒ Object
Adds a new Karafka params instance with given payload and options into an internal buffer that will be used to simulate messages delivery to the consumer
67 68 69 70 71 72 73 74 |
# File 'lib/karafka/testing/rspec/helpers.rb', line 67 def publish_for_karafka(raw_payload, opts = {}) = Karafka::Params::Metadata.new( **.merge(opts) ).freeze _karafka_raw_data << Karafka::Params::Params.new(raw_payload, ) subject.params_batch = Karafka::Params::ParamsBatch.new(_karafka_raw_data) end |