Class: LogStash::Inputs::Elasticsearch

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/elasticsearch.rb

Overview

Read from an Elasticsearch cluster, based on search query results. This is useful for replaying test logs, reindexing, etc.

Example:

source,ruby

input {

# Read all documents from Elasticsearch matching the given query
elasticsearch {
  host => "localhost"
  query => '{ "query": { "match": { "statuscode": 200 } } }'
}

}

This would create an Elasticsearch query with the following format:

source,json

localhost:9200/logstash-*/_search?q=‘{ “query”: { “match”: { “statuscode”: 200 } } }’&scroll=1m&size=1000

TODO(sissel): Option to keep the index, type, and doc id so we can do reindexing?

Instance Method Summary collapse

Instance Method Details

#registerObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/inputs/elasticsearch.rb', line 67

def register
  require "elasticsearch"

  @options = {
    index: @index,
    body: @query,
    scroll: @scroll,
    size: @size
  }

  @options[:search_type] = 'scan' if @scan

  transport_options = {}

  if @user && @password
    token = Base64.strict_encode64("#{@user}:#{@password.value}")
    transport_options[:headers] = { Authorization: "Basic #{token}" }
  end

  hosts = if @ssl then
    @hosts.map {|h| { host: h, scheme: 'https' } }
  else
    @hosts
  end

  if @ssl && @ca_file
    transport_options[:ssl] = { ca_file: @ca_file }
  end

  @client = Elasticsearch::Client.new hosts: hosts, transport_options: transport_options

end

#run(output_queue) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/logstash/inputs/elasticsearch.rb', line 101

def run(output_queue)

  # get first wave of data
  r = @client.search @options

  # since 'scan' doesn't return data on the search call, do an extra scroll
  if @scan
    r = scroll_request(r['_scroll_id'])
  end

  while r['hits']['hits'].any? do
    r['hits']['hits'].each do |event|
      decorate(event)
      output_queue << event
    end
    r = scroll_request(r['_scroll_id'])
  end
end