Class: LogStash::Inputs::Elasticsearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Elasticsearch
- Defined in:
- lib/logstash/inputs/elasticsearch.rb
Overview
.Compatibility Note
- NOTE
-
Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input plugin to version 4.0.2 or higher.
Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc. It also supports periodically scheduling lookup enrichments using a cron syntax (see ‘schedule` setting).
Example:
- source,ruby
-
input {
# Read all documents from Elasticsearch matching the given query elasticsearch { hosts => "localhost" query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }' }}
This would create an Elasticsearch query with the following format:
- source,json
-
curl ‘localhost:9200/logstash-*/_search?&scroll=1m&size=1000’ -d ‘{
"query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ]}‘
Scheduling
Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
Examples:
|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================
Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.validate_value(value, validator) ⇒ Array(true,Object), Array(false,String)
218 219 220 221 222 223 224 225 226 227 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 218 def self.validate_value(value, validator) return super unless validator == :uri_or_empty value = deep_replace(value) value = hash_or_array(value) return true, value.first if value.size == 1 && value.first.empty? return super(value, :uri) end |
Instance Method Details
#register ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 164 def register require "elasticsearch" require "rufus/scheduler" require "elasticsearch/transport/transport/http/manticore" = { :index => @index, :scroll => @scroll, :size => @size } @base_query = LogStash::Json.load(@query) if @slices @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") end = {} fill_user_password_from_cloud_auth if @user && @password token = Base64.strict_encode64("#{@user}:#{@password.value}") [:headers] = { :Authorization => "Basic #{token}" } end fill_hosts_from_cloud_id @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s hosts = if @ssl @hosts.map do |h| host, port = h.split(":") { :host => host, :scheme => 'https', :port => port } end else @hosts end = { :ssl => true, :ca_file => @ca_file } if @ssl && @ca_file ||= {} @logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('') [:proxy] = @proxy.to_s if @proxy && !@proxy.eql?('') @client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => , :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore, :ssl => ) end |
#run(output_queue) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 229 def run(output_queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do do_run(output_queue) end @scheduler.join else do_run(output_queue) end end |
#stop ⇒ Object
242 243 244 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 242 def stop @scheduler.stop if @scheduler end |