Class: LogStash::Inputs::Elasticsearch

Inherits:
Base
  • Object
show all
Extended by:
PositiveWholeNumberValidator, URIOrEmptyValidator, PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
Includes:
PluginMixins::CATrustedFingerprintSupport, PluginMixins::ECSCompatibilitySupport::TargetCheck, PluginMixins::EventSupport::EventFactoryAdapter, PluginMixins::NormalizeConfigSupport, PluginMixins::Scheduler
Defined in:
lib/logstash/inputs/elasticsearch.rb,
lib/logstash/inputs/elasticsearch/aggregation.rb,
lib/logstash/inputs/elasticsearch/paginated_search.rb

Overview

.Compatibility Note

NOTE

Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.

Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).

Example:

source,ruby

input {

# Read all documents from Elasticsearch matching the given query
elasticsearch {
  hosts => "localhost"
  query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}

}

This would create an Elasticsearch query with the following format:

source,json

curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{

"query": {
  "match": {
    "statuscode": 200
  }
},
"sort": [ "_doc" ]

}‘

Scheduling

Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).

Examples:

|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================

Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].

Defined Under Namespace

Modules: PositiveWholeNumberValidator, URIOrEmptyValidator Classes: Aggregation, PaginatedSearch, Scroll, SearchAfter

Constant Summary collapse

BUILD_FLAVOR_SERVERLESS =
'serverless'.freeze
DEFAULT_EAV_HEADER =
{ "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER =
{ 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from URIOrEmptyValidator

validate_value

Methods included from PositiveWholeNumberValidator

validate_value

Constructor Details

#initialize(params = {}) ⇒ Elasticsearch

Returns a new instance of Elasticsearch.



270
271
272
273
274
275
276
# File 'lib/logstash/inputs/elasticsearch.rb', line 270

def initialize(params={})
  super(params)

  if docinfo_target.nil?
    @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]']
  end
end

Instance Attribute Details

#pipeline_idObject (readonly)

Returns the value of attribute pipeline_id.



264
265
266
# File 'lib/logstash/inputs/elasticsearch.rb', line 264

def pipeline_id
  @pipeline_id
end

Instance Method Details

#event_from_hit(hit, root_field) ⇒ Object



352
353
354
355
356
357
358
359
360
361
# File 'lib/logstash/inputs/elasticsearch.rb', line 352

def event_from_hit(hit, root_field)
  event = targeted_event_factory.new_event hit[root_field]
  set_docinfo_fields(hit, event) if @docinfo

  event
rescue => e
  serialized_hit = hit.to_json
  logger.warn("Event creation error, original data now in [event][original] field", message: e.message, exception: e.class, data: serialized_hit)
  return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
end

#push_hit(hit, output_queue, root_field = '_source') ⇒ Object



346
347
348
349
350
# File 'lib/logstash/inputs/elasticsearch.rb', line 346

def push_hit(hit, output_queue, root_field = '_source')
  event = event_from_hit(hit, root_field)
  decorate(event)
  output_queue << event
end

#registerObject



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/logstash/inputs/elasticsearch.rb', line 278

def register
  require "rufus/scheduler"

  @pipeline_id = execution_context&.pipeline_id || 'main'

  fill_hosts_from_cloud_id
  setup_ssl_params!

  @base_query = LogStash::Json.load(@query)
  if @slices
    @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
    @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
  end

  @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")

  validate_authentication
  fill_user_password_from_cloud_auth

  transport_options = {:headers => {}}
  transport_options[:headers].merge!(INTERNAL_ORIGIN_HEADER)
  transport_options[:headers].merge!(setup_basic_auth(user, password))
  transport_options[:headers].merge!(setup_api_key(api_key))
  transport_options[:headers].merge!({'user-agent' => prepare_user_agent()})
  transport_options[:headers].merge!(@custom_headers) unless @custom_headers.empty?
  transport_options[:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil?
  transport_options[:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil?
  transport_options[:socket_timeout]  = @socket_timeout_seconds  unless @socket_timeout_seconds.nil?

  hosts = setup_hosts
  ssl_options = setup_client_ssl

  @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')

  transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('')

  @client_options = {
    :hosts => hosts,
    :transport_options => transport_options,
    :transport_class => get_transport_client_class,
    :ssl => ssl_options
  }

  @client = Elasticsearch::Client.new(@client_options)

  test_connection!

  setup_serverless

  setup_search_api

  setup_query_executor

  @client
end

#run(output_queue) ⇒ Object



334
335
336
337
338
339
340
341
# File 'lib/logstash/inputs/elasticsearch.rb', line 334

def run(output_queue)
  if @schedule
    scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
    scheduler.join
  else
    @query_executor.do_run(output_queue)
  end
end

#set_docinfo_fields(hit, event) ⇒ Object



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/logstash/inputs/elasticsearch.rb', line 363

def set_docinfo_fields(hit, event)
  # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
  docinfo_target = event.get(@docinfo_target) || {}

  unless docinfo_target.is_a?(Hash)
    # expect error to be handled by `#event_from_hit`
    fail RuntimeError, "Incompatible event; unable to merge docinfo fields into docinfo_target=`#{@docinfo_target}`"
  end

  @docinfo_fields.each do |field|
    docinfo_target[field] = hit[field]
  end

  event.set(@docinfo_target, docinfo_target)
end