Class: LogStash::Inputs::Elasticsearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Elasticsearch
- Defined in:
- lib/logstash/inputs/elasticsearch.rb
Overview
Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc.
Example:
input {
# Read all documents from Elasticsearch matching the given query
elasticsearch {
host => "localhost"
query => "ERROR"
}
}
This would create an Elasticsearch query with the following format:
http://localhost:9200/logstash-*/_search?q=ERROR&scroll=1m&size=1000
-
TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing?
Instance Method Summary collapse
Instance Method Details
#register ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 56 def register require "ftw" @agent = FTW::Agent.new params = { "q" => @query, "scroll" => @scroll, "size" => "#{@size}", } params['search_type'] = "scan" if @scan @search_url = "http://#{@host}:#{@port}/#{@index}/_search?#{encode(params)}" @scroll_url = "http://#{@host}:#{@port}/_search/scroll?#{encode({"scroll" => @scroll})}" end |
#run(output_queue) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/logstash/inputs/elasticsearch.rb', line 95 def run(output_queue) result = LogStash::Json.load(execute_search_request) scroll_id = result["_scroll_id"] # When using the search_type=scan we don't get an initial result set. # So we do it here. if @scan result = LogStash::Json.load(execute_scroll_request(scroll_id)) end loop do break if result.nil? hits = result["hits"]["hits"] break if hits.empty? hits.each do |hit| # Hack to make codecs work @codec.decode(LogStash::Json.dump(hit["_source"])) do |event| decorate(event) output_queue << event end end # Get the scroll id from the previous result set and use it for getting the next data set scroll_id = result["_scroll_id"] # Fetch the next result set result = LogStash::Json.load(execute_scroll_request(scroll_id)) if result["error"] @logger.warn(result["error"], :request => scroll_url) # TODO(sissel): raise an error instead of breaking break end end rescue LogStash::ShutdownSignal # Do nothing, let us quit. end |