Class: LogStash::Inputs::Elasticsearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Elasticsearch
- Defined in:
- lib/logstash/inputs/elasticsearch.rb
Overview
.Compatibility Note
- NOTE
-
Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.
Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).
Example:
- source,ruby
-
input {
# Read all documents from Elasticsearch matching the given query elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }}
This would create an Elasticsearch query with the following format:
- source,json
-
curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{
"query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]}‘
Scheduling
Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
Examples:
|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================
Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 161 def register require "elasticsearch" require "rufus/scheduler" require "elasticsearch/transport/transport/http/manticore" @options = { :index => @index, :scroll => @scroll, :size => @size } @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end = {} fill_user_password_from_cloud_auth if @user && @password token = Base64.strict_encode64("#{@user}:#{@password.value}") [:headers] = { :Authorization => "Basic #{token}" } end fill_hosts_from_cloud_id @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s hosts = if @ssl @hosts.map do |h| host, port = h.split(":") { :host => host, :scheme => 'https', :port => port } end else @hosts end = { :ssl => true, :ca_file => @ca_file } if @ssl && @ca_file ||= {} @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => , :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore, :ssl => ) end |
#run(output_queue) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 206 def run(output_queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do do_run(output_queue) end @scheduler.join else do_run(output_queue) end end |
#stop ⇒ Object
219 220 221 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 219 def stop @scheduler.stop if @scheduler end |