Class: LogStash::Inputs::Elasticsearch

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/elasticsearch.rb

Overview

.Compatibility Note

NOTE

Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.

Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).

Example:

source,ruby

input {

# Read all documents from Elasticsearch matching the given query
elasticsearch {
  hosts => "localhost"
  query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}

}

This would create an Elasticsearch query with the following format:

source,json

curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{

"query": {
  "match": {
    "statuscode": 200
  }
},
"sort": [ "_doc" ]

}‘

Scheduling

Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).

Examples:

|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================

Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.validate_value(value, validator) ⇒ Array(true,Object), Array(false,String)

Parameters:

  • value (Array<Object>)
  • validator (nil, Array, Symbol)

Returns:

  • (Array(true,Object))

    : if validation is a success, a tuple containing ‘true` and the coerced value

  • (Array(false,String))

    : if validation is a failure, a tuple containing ‘false` and the failure reason.



218
219
220
221
222
223
224
225
226
227
# File 'lib/logstash/inputs/elasticsearch.rb', line 218

def self.validate_value(value, validator)
  return super unless validator == :uri_or_empty

  value = deep_replace(value)
  value = hash_or_array(value)

  return true, value.first if value.size == 1 && value.first.empty?

  return super(value, :uri)
end

Instance Method Details

#registerObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/logstash/inputs/elasticsearch.rb', line 164

def register
  require "elasticsearch"
  require "rufus/scheduler"
  require "elasticsearch/transport/transport/http/manticore"

  @options = {
    :index => @index,
    :scroll => @scroll,
    :size => @size
  }
  @base_query = LogStash::Json.load(@query)
  if @slices
    @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
    @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
  end

  transport_options = {}

  fill_user_password_from_cloud_auth

  if @user && @password
    token = Base64.strict_encode64("#{@user}:#{@password.value}")
    transport_options[:headers] = { :Authorization => "Basic #{token}" }
  end

  fill_hosts_from_cloud_id
  @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s

  hosts = if @ssl
    @hosts.map do |h|
      host, port = h.split(":")
      { :host => host, :scheme => 'https', :port => port }
    end
  else
    @hosts
  end
  ssl_options = { :ssl  => true, :ca_file => @ca_file } if @ssl && @ca_file
  ssl_options ||= {}

  @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')

  transport_options[:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('')

  @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options,
                                      :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore,
                                      :ssl => ssl_options)
end

#run(output_queue) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/logstash/inputs/elasticsearch.rb', line 229

def run(output_queue)
  if @schedule
    @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
    @scheduler.cron @schedule do
      do_run(output_queue)
    end

    @scheduler.join
  else
    do_run(output_queue)
  end
end

#stopObject



242
243
244
# File 'lib/logstash/inputs/elasticsearch.rb', line 242

def stop
  @scheduler.stop if @scheduler
end