Module: LogStashHelper

Defined in:
lib/logstash/devutils/rspec/logstash_helpers.rb

Constant Summary collapse

TestPipeline =
LogStash::TestPipeline
DEFAULT_NUMBER_OF_TRY =
5
DEFAULT_EXCEPTIONS_FOR_TRY =
[RSpec::Expectations::ExpectationNotMetError]
@@excluded_tags =
{
    :integration => true,
    :redis => true,
    :socket => true,
    :performance => true,
    :couchdb => true,
    :elasticsearch => true,
    :elasticsearch_secure => true,
    :export_cypher => true
}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.excluded_tagsObject



30
31
32
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 30

def self.excluded_tags
  @@excluded_tags
end

Instance Method Details

#agent(&block) ⇒ Object



144
145
146
147
148
149
150
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 144

def agent(&block)
  it("agent(#{caller[0].gsub(/ .*/, "")}) runs") do
    pipeline = new_pipeline_from_string(config)
    pipeline.run
    block.call
  end
end

#config(configstr) ⇒ Object



43
44
45
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 43

def config(configstr)
  let(:config) { configstr }
end

#input(config_string, test_sink: {}, &block) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 118

def input(config_string, test_sink: {}, &block); require 'logstash/outputs/test_sink'
  config_parts = [ config_source(config_string), test_sink_output_source(**test_sink) ]

  pipeline = new_pipeline(config_parts)

  output_delegator = pipeline.outputs.last # LogStash::OutputDelegator
  fail('test_sink output expected') unless output_delegator.config_name.eql?('test_sink')
  test_sink_output = output_delegator.strategy.to_java.output
  queue = test_sink_output.init_event_store

  start_thread = pipeline.start_and_wait

  # NOTE: we used to pass a Queue here, now its a Java List/Queue collection
  result = block.call(pipeline, queue)

  pipeline.shutdown
  start_thread.join if start_thread.alive?

  result
end

#new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone) ⇒ Object



170
171
172
173
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 170

def new_pipeline(config_parts, pipeline_id = :main, settings = ::LogStash::SETTINGS.clone)
  pipeline_config = LogStash::Config::PipelineConfig.new(LogStash::Config::Source::Local, pipeline_id, config_parts, settings)
  TestPipeline.new(pipeline_config)
end

#new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {}) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 152

def new_pipeline_from_string(config_string, pipeline_id: :main, test_sink: {})
  config_parts = [ config_source(config_string) ]

  # include a default test_sink output if no outputs given -> we're using it to track processed events
  # NOTE: a output is required with the JavaPipeline otherwise no processing happen (despite filters being defined)
  if !OUTPUT_BLOCK_RE.match(config_string)
    config_parts << test_sink_output_source(**test_sink)
  elsif test_sink && !test_sink.empty?
    warn "#{__method__} test_sink: #{test_sink.inspect} options have no effect as config_string has an output"
  end

  if !INPUT_BLOCK_RE.match(config_string)
    # NOTE: currently using manual batch to push events down the pipe, so an input isn't required
  end

  new_pipeline(config_parts, pipeline_id)
end

#plugin_input(plugin, &block) ⇒ Object

Deprecated.

Raises:

  • (NotImplementedError)


140
141
142
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 140

def plugin_input(plugin, &block)
  raise NotImplementedError.new("#{__method__} no longer supported; please refactor")
end

#sample(sample_event, validation_source: :output) { ... } ⇒ void

This method returns an undefined value.

Creates a single-example example group in the current rspec context to validate the results of filter operations. The current group is expected to use ‘LogStashHelpers#config` to provide a pipeline configuration.

The provided block has access to an [Array] ‘results`, and also sets an unnamed `subject` that is either a single `LogStash::Event` or an `[Array]` containing at least two entries.

Parameters:

  • sample_event (String, #to_json)
  • validation_source (:output, :queue) (defaults to: :output)

Yields:

  • creates an example with expectations provided by the block



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 67

def sample(sample_event, validation_source: :output, &block)
  validation_sources = [:output, :queue].freeze
  fail(ArgumentError, "Unexpected source `#{validation_source.inspect}`, expected one of #{validation_sources.inspect}") unless validation_sources.include?(validation_source)

  name = sample_event.is_a?(String) ? sample_event : LogStash::Json.dump(sample_event)
  name = name[0..50] + "..." if name.length > 50

  describe name.inspect do
    if validation_source == :output
      let(:test_sink) { LogStash::Outputs::TestSink.new("release_on_close" => false, "store_events" => true) }
      before { expect(LogStash::Outputs::TestSink).to receive(:new).and_return(test_sink) }
    end

    let(:pipeline) { new_pipeline_from_string(config) }
    let(:event) do
      sample_event = [sample_event] unless sample_event.is_a?(Array)
      next sample_event.collect do |e|
        e = { "message" => e } if e.is_a?(String)
        next LogStash::Event.new(e)
      end
    end

    let(:results) do
      # Java pipeline (since 6.1) registers filters from start_workers
      pipeline.run_with(event)

      # flush makes sure to empty any buffered events in the filter
      pipeline.flush_filters(:final => true) { |flushed_event| results << flushed_event }

      case validation_source
      when :queue  then pipeline.filter_queue_client.processed_events # legacy?
      when :output then test_sink.event_store.to_a
      else              fail NotImplementedError("validation source `#{validation_source}` not implemented.")
      end
    end

    # starting at logstash-core 5.3 an initialized pipeline need to be closed
    after do
      pipeline.close if pipeline.respond_to?(:close)
    end

    subject { results.length > 1 ? results : results.first }

    it("processes events as specified", &block)
  end
end

#tags(*tags) ⇒ Object



51
52
53
54
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 51

def tags(*tags)
  let(:default_tags) { tags }
  deprecated "tags(#{tags.inspect}) - let(:default_tags) are not used"
end

#try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block) ⇒ Object



39
40
41
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 39

def try(number_of_try = DEFAULT_NUMBER_OF_TRY, &block)
  Stud.try(number_of_try.times, DEFAULT_EXCEPTIONS_FOR_TRY, &block)
end

#type(default_type) ⇒ Object

def config



47
48
49
# File 'lib/logstash/devutils/rspec/logstash_helpers.rb', line 47

def type(default_type)
  deprecated "type(#{default_type.inspect}) no longer has any effect"
end