Module: LogStashHelper
- Defined in:
- lib/logstash/devutils/rspec/logstash_helpers.rb
Constant Summary collapse
- TestPipeline =
LogStash::TestPipeline
- DEFAULT_NUMBER_OF_TRY =
5
- DEFAULT_EXCEPTIONS_FOR_TRY =
[RSpec::Expectations::ExpectationNotMetError]
{ :integration => true, :redis => true, :socket => true, :performance => true, :couchdb => true, :elasticsearch => true, :elasticsearch_secure => true, :export_cypher => true }
Class Method Summary collapse
Instance Method Summary collapse
- #agent(&block) ⇒ Object
- #config(configstr) ⇒ Object
- #input(config_string, test_sink: {}, &block) ⇒ Object
- #new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone) ⇒ Object
- #new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {}) ⇒ Object
- #plugin_input(plugin, &block) ⇒ Object deprecated Deprecated.
-
#sample(sample_event, validation_source: :output) { ... } ⇒ void
Creates a single-example example group in the current rspec context to validate the results of filter operations.
- #tags(*tags) ⇒ Object
- #try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block) ⇒ Object
-
#type(default_type) ⇒ Object
def config.
Class Method Details
.excluded_tags ⇒ Object
30 31 32 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 30 def self. @@excluded_tags end |
Instance Method Details
#agent(&block) ⇒ Object
144 145 146 147 148 149 150 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 144 def agent(&block) it("agent(#{caller[0].gsub(/ .*/, "")}) runs") do pipeline = new_pipeline_from_string(config) pipeline.run block.call end end |
#config(configstr) ⇒ Object
43 44 45 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 43 def config(configstr) let(:config) { configstr } end |
#input(config_string, test_sink: {}, &block) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 118 def input(config_string, test_sink: {}, &block); require 'logstash/outputs/test_sink' config_parts = [ config_source(config_string), test_sink_output_source(**test_sink) ] pipeline = new_pipeline(config_parts) output_delegator = pipeline.outputs.last # LogStash::OutputDelegator fail('test_sink output expected') unless output_delegator.config_name.eql?('test_sink') test_sink_output = output_delegator.strategy.to_java.output queue = test_sink_output.init_event_store start_thread = pipeline.start_and_wait # NOTE: we used to pass a Queue here, now its a Java List/Queue collection result = block.call(pipeline, queue) pipeline.shutdown start_thread.join if start_thread.alive? result end |
#new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone) ⇒ Object
170 171 172 173 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 170 def new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone) pipeline_config = LogStash::Config::PipelineConfig.new(LogStash::Config::Source::Local, pipeline_id, config_parts, settings) TestPipeline.new(pipeline_config) end |
#new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {}) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 152 def new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {}) config_parts = [ config_source(config_string) ] # include a default test_sink output if no outputs given -> we're using it to track processed events # NOTE: a output is required with the JavaPipeline otherwise no processing happen (despite filters being defined) if !OUTPUT_BLOCK_RE.match(config_string) config_parts << test_sink_output_source(**test_sink) elsif test_sink && !test_sink.empty? warn "#{__method__} test_sink: #{test_sink.inspect} options have no effect as config_string has an output" end if !INPUT_BLOCK_RE.match(config_string) # NOTE: currently using manual batch to push events down the pipe, so an input isn't required end new_pipeline(config_parts, pipeline_id) end |
#plugin_input(plugin, &block) ⇒ Object
Deprecated.
140 141 142 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 140 def plugin_input(plugin, &block) raise NotImplementedError.new("#{__method__} no longer supported; please refactor") end |
#sample(sample_event, validation_source: :output) { ... } ⇒ void
This method returns an undefined value.
Creates a single-example example group in the current rspec context to validate the results of filter operations. The current group is expected to use ‘LogStashHelpers#config` to provide a pipeline configuration.
The provided block has access to an [Array] ‘results`, and also sets an unnamed `subject` that is either a single `LogStash::Event` or an `[Array]` containing at least two entries.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 67 def sample(sample_event, validation_source: :output, &block) validation_sources = [:output, :queue].freeze fail(ArgumentError, "Unexpected source `#{validation_source.inspect}`, expected one of #{validation_sources.inspect}") unless validation_sources.include?(validation_source) name = sample_event.is_a?(String) ? sample_event : LogStash::Json.dump(sample_event) name = name[0..50] + "..." if name.length > 50 describe name.inspect do if validation_source == :output let(:test_sink) { LogStash::Outputs::TestSink.new("release_on_close" => false, "store_events" => true) } before { expect(LogStash::Outputs::TestSink).to receive(:new).and_return(test_sink) } end let(:pipeline) { new_pipeline_from_string(config) } let(:event) do sample_event = [sample_event] unless sample_event.is_a?(Array) next sample_event.collect do |e| e = { "message" => e } if e.is_a?(String) next LogStash::Event.new(e) end end let(:results) do # Java pipeline (since 6.1) registers filters from start_workers pipeline.run_with(event) # flush makes sure to empty any buffered events in the filter pipeline.flush_filters(:final => true) { |flushed_event| results << flushed_event } case validation_source when :queue then pipeline.filter_queue_client.processed_events # legacy? when :output then test_sink.event_store.to_a else fail NotImplementedError("validation source `#{validation_source}` not implemented.") end end # starting at logstash-core 5.3 an initialized pipeline need to be closed after do pipeline.close if pipeline.respond_to?(:close) end subject { results.length > 1 ? results : results.first } it("processes events as specified", &block) end end |
#tags(*tags) ⇒ Object
51 52 53 54 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 51 def (*) let(:default_tags) { } deprecated "tags(#{.inspect}) - let(:default_tags) are not used" end |
#try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block) ⇒ Object
39 40 41 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 39 def try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block) Stud.try(number_of_try.times, DEFAULT_EXCEPTIONS_FOR_TRY, &block) end |
#type(default_type) ⇒ Object
def config
47 48 49 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 47 def type(default_type) deprecated "type(#{default_type.inspect}) no longer has any effect" end |