Module: LogStashHelper
- Defined in:
- lib/logstash/devutils/rspec/logstash_helpers.rb
Constant Summary collapse
- TestPipeline =
LogStash::TestPipeline
- DEFAULT_NUMBER_OF_TRY =
5
- DEFAULT_EXCEPTIONS_FOR_TRY =
[RSpec::Expectations::ExpectationNotMetError]
{ :integration => true, :redis => true, :socket => true, :performance => true, :couchdb => true, :elasticsearch => true, :elasticsearch_secure => true, :export_cypher => true }
Class Method Summary collapse
Instance Method Summary collapse
- #agent(&block) ⇒ Object
- #config(configstr) ⇒ Object
- #input(config_string, test_sink: {}, &block) ⇒ Object
- #new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone) ⇒ Object
- #new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {}) ⇒ Object
- #plugin_input(plugin, &block) ⇒ Object deprecated Deprecated.
- #sample(sample_event, &block) ⇒ Object
- #tags(*tags) ⇒ Object
- #try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block) ⇒ Object
-
#type(default_type) ⇒ Object
def config.
Class Method Details
.excluded_tags ⇒ Object
29 30 31 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 29 def self. @@excluded_tags end |
Instance Method Details
#agent(&block) ⇒ Object
120 121 122 123 124 125 126 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 120 def agent(&block) it("agent(#{caller[0].gsub(/ .*/, "")}) runs") do pipeline = new_pipeline_from_string(config) pipeline.run block.call end end |
#config(configstr) ⇒ Object
42 43 44 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 42 def config(configstr) let(:config) { configstr } end |
#input(config_string, test_sink: {}, &block) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 94 def input(config_string, test_sink: {}, &block); require 'logstash/outputs/test_sink' config_parts = [ config_source(config_string), test_sink_output_source(**test_sink) ] pipeline = new_pipeline(config_parts) output_delegator = pipeline.outputs.last # LogStash::OutputDelegator fail('test_sink output expected') unless output_delegator.config_name.eql?('test_sink') test_sink_output = output_delegator.strategy.to_java.output queue = test_sink_output.init_event_store start_thread = pipeline.start_and_wait # NOTE: we used to pass a Queue here, now its a Java List/Queue collection result = block.call(pipeline, queue) pipeline.shutdown start_thread.join if start_thread.alive? result end |
#new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone) ⇒ Object
146 147 148 149 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 146 def new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone) pipeline_config = LogStash::Config::PipelineConfig.new(LogStash::Config::Source::Local, pipeline_id, config_parts, settings) TestPipeline.new(pipeline_config) end |
#new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {}) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 128 def new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {}) config_parts = [ config_source(config_string) ] # include a default test_sink output if no outputs given -> we're using it to track processed events # NOTE: a output is required with the JavaPipeline otherwise no processing happen (despite filters being defined) if !OUTPUT_BLOCK_RE.match(config_string) config_parts << test_sink_output_source(**test_sink) elsif test_sink && !test_sink.empty? warn "#{__method__} test_sink: #{test_sink.inspect} options have no effect as config_string has an output" end if !INPUT_BLOCK_RE.match(config_string) # NOTE: currently using manual batch to push events down the pipe, so an input isn't required end new_pipeline(config_parts, pipeline_id) end |
#plugin_input(plugin, &block) ⇒ Object
Deprecated.
116 117 118 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 116 def plugin_input(plugin, &block) raise NotImplementedError.new("#{__method__} no longer supported; please refactor") end |
#sample(sample_event, &block) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 55 def sample(sample_event, &block) name = sample_event.is_a?(String) ? sample_event : LogStash::Json.dump(sample_event) name = name[0..50] + "..." if name.length > 50 describe "\"#{name}\"" do let(:pipeline) { new_pipeline_from_string(config) } let(:event) do sample_event = [sample_event] unless sample_event.is_a?(Array) next sample_event.collect do |e| e = { "message" => e } if e.is_a?(String) next LogStash::Event.new(e) end end let(:results) do # Java pipeline (since 6.1) registers filters from start_workers pipeline.run_with(event) # flush makes sure to empty any buffered events in the filter pipeline.flush_filters(:final => true) { |flushed_event| results << flushed_event } pipeline.filter_queue_client.processed_events end # starting at logstash-core 5.3 an initialized pipeline need to be closed after do pipeline.close if pipeline.respond_to?(:close) end subject { results.length > 1 ? results : results.first } it("when processed", &block) end end |
#tags(*tags) ⇒ Object
50 51 52 53 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 50 def (*) let(:default_tags) { } deprecated "tags(#{.inspect}) - let(:default_tags) are not used" end |
#try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block) ⇒ Object
38 39 40 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 38 def try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block) Stud.try(number_of_try.times, DEFAULT_EXCEPTIONS_FOR_TRY, &block) end |
#type(default_type) ⇒ Object
def config
46 47 48 |
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 46 def type(default_type) deprecated "type(#{default_type.inspect}) no longer has any effect" end |