Class: LogStash::Inputs::Elasticsearch

Inherits:
Base
  • Object
show all
Extended by:
PositiveWholeNumberValidator, URIOrEmptyValidator, PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
Includes:
PluginMixins::CATrustedFingerprintSupport, PluginMixins::ECSCompatibilitySupport::TargetCheck, PluginMixins::EventSupport::EventFactoryAdapter, PluginMixins::Scheduler
Defined in:
lib/logstash/inputs/elasticsearch.rb

Overview

.Compatibility Note

NOTE

Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.

Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).

Example:

source,ruby

input {

# Read all documents from Elasticsearch matching the given query
elasticsearch {
  hosts => "localhost"
  query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}

}

This would create an Elasticsearch query with the following format:

source,json

curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{

"query": {
  "match": {
    "statuscode": 200
  }
},
"sort": [ "_doc" ]

}‘

Scheduling

Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).

Examples:

|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================

Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].

Defined Under Namespace

Modules: PositiveWholeNumberValidator, URIOrEmptyValidator

Instance Method Summary collapse

Methods included from URIOrEmptyValidator

validate_value

Methods included from PositiveWholeNumberValidator

validate_value

Constructor Details

#initialize(params = {}) ⇒ Elasticsearch

Returns a new instance of Elasticsearch.



202
203
204
205
206
207
208
# File 'lib/logstash/inputs/elasticsearch.rb', line 202

def initialize(params={})
  super(params)

  if docinfo_target.nil?
    @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]']
  end
end

Instance Method Details

#registerObject



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/logstash/inputs/elasticsearch.rb', line 210

def register
  require "rufus/scheduler"

  @options = {
    :index => @index,
    :scroll => @scroll,
    :size => @size
  }
  @base_query = LogStash::Json.load(@query)
  if @slices
    @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
    @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
  end

  validate_authentication
  fill_user_password_from_cloud_auth
  fill_hosts_from_cloud_id


  transport_options = {:headers => {}}
  transport_options[:headers].merge!(setup_basic_auth(user, password))
  transport_options[:headers].merge!(setup_api_key(api_key))
  transport_options[:headers].merge!({'user-agent' => prepare_user_agent()})
  transport_options[:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil?
  transport_options[:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil?
  transport_options[:socket_timeout]  = @socket_timeout_seconds  unless @socket_timeout_seconds.nil?

  hosts = setup_hosts
  ssl_options = setup_ssl

  @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')

  transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('')

  @client = Elasticsearch::Client.new(
    :hosts => hosts,
    :transport_options => transport_options,
    :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
    :ssl => ssl_options
  )
  test_connection!
  @client
end

#run(output_queue) ⇒ Object



255
256
257
258
259
260
261
262
# File 'lib/logstash/inputs/elasticsearch.rb', line 255

def run(output_queue)
  if @schedule
    scheduler.cron(@schedule) { do_run(output_queue) }
    scheduler.join
  else
    do_run(output_queue)
  end
end