Class: LogStash::Filters::ElasticsearchClient

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/filters/elasticsearch/client.rb

Constant Summary collapse

BUILD_FLAVOR_SERVERLESS =
'serverless'.freeze
DEFAULT_EAV_HEADER =
{ "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER =
{ 'x-elastic-product-origin' => 'logstash-filter-elasticsearch'}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, hosts, options = {}) ⇒ ElasticsearchClient

Returns a new instance of ElasticsearchClient.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/logstash/filters/elasticsearch/client.rb', line 17

def initialize(logger, hosts, options = {})
  user = options.fetch(:user, nil)
  password = options.fetch(:password, nil)
  api_key = options.fetch(:api_key, nil)
  proxy = options.fetch(:proxy, nil)
  user_agent = options[:user_agent]
  custom_headers = options[:custom_headers]


  transport_options = { }
  transport_options[:headers] = options.fetch(:serverless, false) ?  DEFAULT_EAV_HEADER.dup : {}
  transport_options[:headers].merge!(setup_basic_auth(user, password))
  transport_options[:headers].merge!(setup_api_key(api_key))
  transport_options[:headers].merge!({ 'user-agent' => "#{user_agent}" })
  transport_options[:headers].merge!(INTERNAL_ORIGIN_HEADER)
  transport_options[:headers].merge!(custom_headers) unless custom_headers.empty?

  transport_options[:pool_max] = 1000
  transport_options[:pool_max_per_route] = 100

  logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')
  transport_options[:proxy] = proxy.to_s if proxy && !proxy.eql?('')

  ssl_options = options.fetch(:ssl, { :enabled => false })
  ssl_enabled = ssl_options.fetch(:enabled, false)

  hosts = setup_hosts(hosts, ssl_enabled)

  client_options = {
                 hosts: hosts,
       transport_class: get_transport_client_class,
     transport_options: transport_options,
                   ssl: ssl_options,
      retry_on_failure: options[:retry_on_failure],
       retry_on_status: options[:retry_on_status]
  }

  logger.info("New ElasticSearch filter client", :hosts => hosts)
  @client = ::Elasticsearch::Client.new(client_options)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



10
11
12
# File 'lib/logstash/filters/elasticsearch/client.rb', line 10

def client
  @client
end

#es_transport_client_typeObject (readonly)

Returns the value of attribute es_transport_client_type.



11
12
13
# File 'lib/logstash/filters/elasticsearch/client.rb', line 11

def es_transport_client_type
  @es_transport_client_type
end

Instance Method Details

#build_flavorObject



66
67
68
# File 'lib/logstash/filters/elasticsearch/client.rb', line 66

def build_flavor
  @build_flavor ||= info&.dig('version', 'build_flavor')
end

#infoObject



62
63
64
# File 'lib/logstash/filters/elasticsearch/client.rb', line 62

def info
  @client.info
end

#search(params = {}) ⇒ Object



58
59
60
# File 'lib/logstash/filters/elasticsearch/client.rb', line 58

def search(params={})
  @client.search(params)
end

#serverless?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/logstash/filters/elasticsearch/client.rb', line 70

def serverless?
  @is_serverless ||= (build_flavor == BUILD_FLAVOR_SERVERLESS)
end