Class: LogStash::Filters::Elasticsearch

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::CATrustedFingerprintSupport, PluginMixins::NormalizeConfigSupport, MonitorMixin
Defined in:
lib/logstash/filters/elasticsearch.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#shared_clientObject (readonly)

Returns the value of attribute shared_client.



146
147
148
# File 'lib/logstash/filters/elasticsearch.rb', line 146

def shared_client
  @shared_client
end

Class Method Details

.validate_value(value, validator) ⇒ Array(true,Object), Array(false,String)



154
155
156
157
158
159
160
161
162
163
# File 'lib/logstash/filters/elasticsearch.rb', line 154

def self.validate_value(value, validator)
  return super unless validator == :uri_or_empty

  value = deep_replace(value)
  value = hash_or_array(value)

  return true, value.first if value.size == 1 && value.first.empty?

  return super(value, :uri)
end

Instance Method Details

#filter(event) ⇒ Object

def register



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/logstash/filters/elasticsearch.rb', line 190

def filter(event)
  matched = false
  begin
    params = { :index => event.sprintf(@index) }

    if @query_dsl
      query = LogStash::Json.load(event.sprintf(@query_dsl))
      params[:body] = query
    else
      query = event.sprintf(@query)
      params[:q] = query
      params[:size] = result_size
      params[:sort] =  @sort if @enable_sort
    end

    @logger.debug("Querying elasticsearch for lookup", :params => params)

    results = get_client.search(params)
    raise "Elasticsearch query error: #{results["_shards"]["failures"]}" if results["_shards"].include? "failures"

    event.set("[@metadata][total_hits]", extract_total_from_hits(results['hits']))

    resultsHits = results["hits"]["hits"]
    if !resultsHits.nil? && !resultsHits.empty?
      matched = true
      @fields.each do |old_key, new_key|
        old_key_path = extract_path(old_key)
        set = resultsHits.map do |doc|
          extract_value(doc["_source"], old_key_path)
        end
        event.set(new_key, set.count > 1 ? set : set.first)
      end
      @docinfo_fields.each do |old_key, new_key|
        old_key_path = extract_path(old_key)
        set = resultsHits.map do |doc|
          extract_value(doc, old_key_path)
        end
        event.set(new_key, set.count > 1 ? set : set.first)
      end
    end

    resultsAggs = results["aggregations"]
    if !resultsAggs.nil? && !resultsAggs.empty?
      matched = true
      @aggregation_fields.each do |agg_name, ls_field|
        event.set(ls_field, resultsAggs[agg_name])
      end
    end

  rescue => e
    if @logger.trace?
      @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :query => query, :event => event.to_hash, :error => e.message, :backtrace => e.backtrace)
    elsif @logger.debug?
      @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message, :backtrace => e.backtrace)
    else
      @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message)
    end
    @tag_on_failure.each{|tag| event.tag(tag)}
  else
    filter_matched(event) if matched
  end
end

#prepare_user_agentObject

public only to be reuse in testing



254
255
256
257
258
259
260
261
262
263
264
# File 'lib/logstash/filters/elasticsearch.rb', line 254

def prepare_user_agent
  os_name = java.lang.System.getProperty('os.name')
  os_version = java.lang.System.getProperty('os.version')
  os_arch = java.lang.System.getProperty('os.arch')
  jvm_vendor = java.lang.System.getProperty('java.vendor')
  jvm_version = java.lang.System.getProperty('java.version')

  plugin_version = Gem.loaded_specs['logstash-filter-elasticsearch'].version
  # example: logstash/7.14.1 (OS=Linux-5.4.0-84-generic-amd64; JVM=AdoptOpenJDK-11.0.11) logstash-output-elasticsearch/11.0.1
  "logstash/#{LOGSTASH_VERSION} (OS=#{os_name}-#{os_version}-#{os_arch}; JVM=#{jvm_vendor}-#{jvm_version}) logstash-#{@plugin_type}-#{config_name}/#{plugin_version}"
end

#registerObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/logstash/filters/elasticsearch.rb', line 165

def register
  #Load query if it exists
  if @query_template
    if File.zero?(@query_template)
      raise "template is empty"
    end
    file = File.open(@query_template, 'r')
    @query_dsl = file.read
  end

  validate_query_settings
  fill_hosts_from_cloud_id
  setup_ssl_params!
  validate_authentication
  fill_user_password_from_cloud_auth

  @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s

  test_connection!
  setup_serverless
  if get_client.es_transport_client_type == "elasticsearch_transport"
    require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
  end
end