Class: LogStash::Inputs::Elasticsearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Elasticsearch
- Extended by:
- PositiveWholeNumberValidator, URIOrEmptyValidator, PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter
- Includes:
- PluginMixins::CATrustedFingerprintSupport, PluginMixins::ECSCompatibilitySupport::TargetCheck, PluginMixins::EventSupport::EventFactoryAdapter, PluginMixins::NormalizeConfigSupport, PluginMixins::Scheduler
- Defined in:
- lib/logstash/inputs/elasticsearch.rb,
lib/logstash/inputs/elasticsearch/aggregation.rb,
lib/logstash/inputs/elasticsearch/paginated_search.rb
Overview
.Compatibility Note
- NOTE
-
Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.
Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).
Example:
- source,ruby
-
input {
# Read all documents from Elasticsearch matching the given query elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }}
This would create an Elasticsearch query with the following format:
- source,json
-
curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{
"query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]}‘
Scheduling
Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
Examples:
|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================
Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
Defined Under Namespace
Modules: PositiveWholeNumberValidator, URIOrEmptyValidator Classes: Aggregation, PaginatedSearch, Scroll, SearchAfter
Constant Summary collapse
- BUILD_FLAVOR_SERVERLESS =
'serverless'.freeze
- DEFAULT_EAV_HEADER =
{ "Elastic-Api-Version" => "2023-10-31" }.freeze
- INTERNAL_ORIGIN_HEADER =
{ 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze
Instance Attribute Summary collapse
-
#pipeline_id ⇒ Object
readonly
Returns the value of attribute pipeline_id.
Instance Method Summary collapse
- #event_from_hit(hit, root_field) ⇒ Object
-
#initialize(params = {}) ⇒ Elasticsearch
constructor
A new instance of Elasticsearch.
- #push_hit(hit, output_queue, root_field = '_source') ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
- #set_docinfo_fields(hit, event) ⇒ Object
Methods included from URIOrEmptyValidator
Methods included from PositiveWholeNumberValidator
Constructor Details
#initialize(params = {}) ⇒ Elasticsearch
Returns a new instance of Elasticsearch.
270 271 272 273 274 275 276 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 270 def initialize(params={}) super(params) if docinfo_target.nil? @docinfo_target = ecs_select[disabled: '@metadata', v1: '[@metadata][input][elasticsearch]'] end end |
Instance Attribute Details
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
264 265 266 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 264 def pipeline_id @pipeline_id end |
Instance Method Details
#event_from_hit(hit, root_field) ⇒ Object
352 353 354 355 356 357 358 359 360 361 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 352 def event_from_hit(hit, root_field) event = targeted_event_factory.new_event hit[root_field] set_docinfo_fields(hit, event) if @docinfo event rescue => e serialized_hit = hit.to_json logger.warn("Event creation error, original data now in [event][original] field", message: e., exception: e.class, data: serialized_hit) return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure']) end |
#push_hit(hit, output_queue, root_field = '_source') ⇒ Object
346 347 348 349 350 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 346 def push_hit(hit, output_queue, root_field = '_source') event = event_from_hit(hit, root_field) decorate(event) output_queue << event end |
#register ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 278 def register require "rufus/scheduler" @pipeline_id = execution_context&.pipeline_id || 'main' fill_hosts_from_cloud_id setup_ssl_params! @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") validate_authentication fill_user_password_from_cloud_auth = {:headers => {}} [:headers].merge!(INTERNAL_ORIGIN_HEADER) [:headers].merge!(setup_basic_auth(user, password)) [:headers].merge!(setup_api_key(api_key)) [:headers].merge!({'user-agent' => prepare_user_agent()}) [:headers].merge!(@custom_headers) unless @custom_headers.empty? [:request_timeout] = @request_timeout_seconds unless @request_timeout_seconds.nil? [:connect_timeout] = @connect_timeout_seconds unless @connect_timeout_seconds.nil? [:socket_timeout] = @socket_timeout_seconds unless @socket_timeout_seconds.nil? hosts = setup_hosts = setup_client_ssl @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('') [:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') @client_options = { :hosts => hosts, :transport_options => , :transport_class => get_transport_client_class, :ssl => } @client = Elasticsearch::Client.new(@client_options) test_connection! setup_serverless setup_search_api setup_query_executor @client end |
#run(output_queue) ⇒ Object
334 335 336 337 338 339 340 341 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 334 def run(output_queue) if @schedule scheduler.cron(@schedule) { @query_executor.do_run(output_queue) } scheduler.join else @query_executor.do_run(output_queue) end end |
#set_docinfo_fields(hit, event) ⇒ Object
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 363 def set_docinfo_fields(hit, event) # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} unless docinfo_target.is_a?(Hash) # expect error to be handled by `#event_from_hit` fail RuntimeError, "Incompatible event; unable to merge docinfo fields into docinfo_target=`#{@docinfo_target}`" end @docinfo_fields.each do |field| docinfo_target[field] = hit[field] end event.set(@docinfo_target, docinfo_target) end |