Module: OpenSearch::Transport::Transport::Base Abstract

Includes:
Loggable
Included in:
HTTP::Curb, HTTP::Faraday, HTTP::Manticore
Defined in:
lib/opensearch/transport/transport/base.rb

Overview

This module is abstract.

Module with common functionality for transport implementations.

Constant Summary collapse

DEFAULT_PORT =
9200
DEFAULT_PROTOCOL =
'http'
DEFAULT_RELOAD_AFTER =

Requests

10_000
DEFAULT_RESURRECT_AFTER =

Seconds

60
DEFAULT_MAX_RETRIES =

Requests

3
DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
SANITIZED_PASSWORD =
'*' * rand(1..14)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



43
44
45
# File 'lib/opensearch/transport/transport/base.rb', line 43

def connections
  @connections
end

#counterObject (readonly)

Returns the value of attribute counter.



43
44
45
# File 'lib/opensearch/transport/transport/base.rb', line 43

def counter
  @counter
end

#hostsObject (readonly)

Returns the value of attribute hosts.



43
44
45
# File 'lib/opensearch/transport/transport/base.rb', line 43

def hosts
  @hosts
end

#last_request_atObject (readonly)

Returns the value of attribute last_request_at.



43
44
45
# File 'lib/opensearch/transport/transport/base.rb', line 43

def last_request_at
  @last_request_at
end

#loggerObject

Returns the value of attribute logger.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



43
44
45
# File 'lib/opensearch/transport/transport/base.rb', line 43

def options
  @options
end

#protocolObject (readonly)

Returns the value of attribute protocol.



43
44
45
# File 'lib/opensearch/transport/transport/base.rb', line 43

def protocol
  @protocol
end

#reload_afterObject

Returns the value of attribute reload_after.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def reload_after
  @reload_after
end

#reload_connectionsObject

Returns the value of attribute reload_connections.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def reload_connections
  @reload_connections
end

#resurrect_afterObject

Returns the value of attribute resurrect_after.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def resurrect_after
  @resurrect_after
end

#serializerObject

Returns the value of attribute serializer.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def serializer
  @serializer
end

#snifferObject

Returns the value of attribute sniffer.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def sniffer
  @sniffer
end

#tracerObject

Returns the value of attribute tracer.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def tracer
  @tracer
end

Instance Method Details

#__build_connection(_host, _options = {}, _block = nil) ⇒ Connections::Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method is abstract.

Build and return a connection. A transport implementation must implement this method. See HTTP::Faraday#__build_connection for an example.

Returns:

Raises:

  • (NoMethodError)


179
180
181
# File 'lib/opensearch/transport/transport/base.rb', line 179

def __build_connection(_host, _options = {}, _block = nil)
  raise NoMethodError, 'Implement this method in your class'
end

#__build_connectionsConnections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Builds and returns a collection of connections

The adapters have to implement the #__build_connection method.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/opensearch/transport/transport/base.rb', line 155

def __build_connections
  Connections::Collection.new \
    connections: hosts.map { |host|
                   host[:protocol] =
                     host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
                   host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
                   if (options[:user] || options[:http][:user]) && !host[:user]
                     host[:user] ||= options[:user] || options[:http][:user]
                     host[:password] ||= options[:password] || options[:http][:password]
                   end

                   __build_connection(host, (options[:transport_options] || {}), @block)
                 },
    selector_class: options[:selector_class],
    selector: options[:selector]
end

#__close_connectionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the connections collection



187
188
189
# File 'lib/opensearch/transport/transport/base.rb', line 187

def __close_connections
  # A hook point for specific adapters when they need to close connections
end

#__convert_to_json(obj = nil, options = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Converts any non-String object to JSON



234
235
236
# File 'lib/opensearch/transport/transport/base.rb', line 234

def __convert_to_json(obj = nil, options = {})
  obj.is_a?(String) ? obj : serializer.dump(obj, options)
end

#__full_url(host) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a full URL based on information from host

Parameters:

  • host (Hash)

    Host configuration passed in from Client



243
244
245
246
247
248
249
250
# File 'lib/opensearch/transport/transport/base.rb', line 243

def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user]
  url += host[:host]
  url += ":#{host[:port]}" if host[:port]
  url += host[:path] if host[:path]
  url
end

#__log_response(method, _path, _params, body, url, response, _json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log request and response information



195
196
197
198
199
200
201
202
# File 'lib/opensearch/transport/transport/base.rb', line 195

def __log_response(method, _path, _params, body, url, response, _json, took, duration)
  return unless logger
  sanitized_url = url.to_s.gsub(%r{//(.+):(.+)@}, "//\\1:#{SANITIZED_PASSWORD}@")
  log_info "#{method.to_s.upcase} #{sanitized_url} " \
           "[status:#{response.status}, request:#{format('%.3fs', duration)}, query:#{took}]"
  log_debug "> #{__convert_to_json(body)}" if body
  log_debug "< #{response.body}"
end

#__raise_transport_error(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raise error specific for the HTTP response status or a generic server error

Raises:

  • (error)


225
226
227
228
# File 'lib/opensearch/transport/transport/base.rb', line 225

def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error, "[#{response.status}] #{response.body}"
end

#__rebuild_connections(arguments = {}) ⇒ Connections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rebuilds the connections collection in the transport.

The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/opensearch/transport/transport/base.rb', line 131

def __rebuild_connections(arguments = {})
  @state_mutex.synchronize do
    @hosts       = arguments[:hosts]    || []
    @options     = arguments[:options]  || {}

    __close_connections

    new_connections = __build_connections
    stale_connections = @connections.all.reject { |c| new_connections.include?(c) }
    new_connections = new_connections.reject { |c| @connections.all.include?(c) }

    @connections.remove(stale_connections)
    @connections.add(new_connections)
    @connections
  end
end

#__trace(method, path, params, headers, body, _url, response, json, _took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trace the request in the ‘curl` format



208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/opensearch/transport/transport/base.rb', line 208

def __trace(method, path, params, headers, body, _url, response, json, _took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
               (params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}")
  trace_body = body ? " -d '#{__convert_to_json(body, pretty: true)}'" : ''
  trace_command = "curl -X #{method.to_s.upcase}"
  trace_command += " -H '#{headers.collect { |k, v| "#{k}: #{v}" }.join(', ')}'" if headers && !headers.empty?
  trace_command += " '#{trace_url}'#{trace_body}\n"
  tracer.info trace_command
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  return "# #{response.body}\n" unless tracer.debug json
  "#{serializer.dump(json, pretty: true).gsub(/^/, '# ').sub(/\}$/, "\n# }")}\n"
end

#get_connection(options = {}) ⇒ Connections::Connection

Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.



94
95
96
97
98
99
100
# File 'lib/opensearch/transport/transport/base.rb', line 94

def get_connection(options = {})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  @counter_mtx.synchronize { @counter += 1 }
  reload_connections! if reload_connections && (counter % reload_after).zero?
  connections.get_connection(options)
end

#host_unreachable_exceptionsArray

This method is abstract.

Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.

Returns:

  • (Array)


382
383
384
# File 'lib/opensearch/transport/transport/base.rb', line 382

def host_unreachable_exceptions
  [Errno::ECONNREFUSED]
end

#initialize(arguments = {}, &block) ⇒ Object

Creates a new transport object

Parameters:

  • arguments (Hash) (defaults to: {})

    Settings and options for the transport

  • block (Proc)

    Lambda or Proc which can be evaluated in the context of the “session” object

Options Hash (arguments):

  • :hosts (Array)

    An Array of normalized hosts information

  • :options (Array)

    A Hash with options (usually passed by Client)

See Also:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/opensearch/transport/transport/base.rb', line 58

def initialize(arguments = {}, &block)
  @state_mutex = Mutex.new

  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @options[:http] ||= {}
  @options[:retry_on_status] ||= []

  @block       = block
  @compression = !@options[:compression].nil?
  @connections = __build_connections

  @serializer  = options[:serializer] || (options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self))
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @counter_mtx = Mutex.new
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @retry_on_status = Array(options[:retry_on_status]).map(&:to_i)
end

#perform_request(method, path, params = {}, body = nil, _headers = nil, opts = {}, &block) ⇒ Response

This method is abstract.

The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

Parameters:

  • method (String)

    Request method

  • path (String)

    The API endpoint

  • params (Hash) (defaults to: {})

    Request parameters (will be serialized by Connections::Connection#full_url)

  • body (Hash) (defaults to: nil)

    Request body (will be serialized by the #serializer)

  • headers (Hash)

    Request headers (will be serialized by the #serializer)

  • block (Proc)

    Code block to evaluate, passed from the implementation

Returns:

Raises:

  • (NoMethodError)

    If no block is passed

  • (ServerError)

    If request failed on server

  • (Error)

    If no connection is available



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/opensearch/transport/transport/base.rb', line 270

def perform_request(method, path, params = {}, body = nil, _headers = nil, opts = {}, &block)
  raise NoMethodError, 'Implement this method in your transport class' unless block_given?

  start = Time.now
  tries = 0
  reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])

  max_retries = if opts.key?(:retry_on_failure)
                  opts[:retry_on_failure] == true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
                elsif options.key?(:retry_on_failure)
                  options[:retry_on_failure] == true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
                end

  params = params.clone

  ignore = Array(params.delete(:ignore)).compact.map(&:to_i)

  begin
    tries     += 1
    connection = get_connection or raise(Error, 'Cannot get new connection from pool.')

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url      = connection.full_url(path, params)

    response = block.call(connection, url)

    connection.healthy! if connection.failures.positive?

    # Raise an exception so we can catch it for `retry_on_status`
    if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)
      __raise_transport_error(response)
    end
  rescue OpenSearch::Transport::Transport::ServerError => e
    raise e unless response && @retry_on_status.include?(response.status)
    log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
    if tries <= (max_retries || DEFAULT_MAX_RETRIES)
      retry
    else
      log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
      raise e
    end
  rescue *host_unreachable_exceptions => e
    log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"

    connection.dead!

    if reload_on_failure && (tries < connections.all.size)
      log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
      reload_connections! and retry
    end

    raise e unless max_retries
    log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
    if tries <= max_retries
      retry
    else
      log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
      raise e
    end
  rescue StandardError => e
    log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
    raise e
  end

  duration = Time.now - start

  if response.status.to_i >= 300
    __log_response method, path, params, body, url, response, nil, 'N/A', duration
    if tracer
      __trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A',
              duration
    end

    # Log the failure only when `ignore` doesn't match the response status
    log_fatal "[#{response.status}] #{response.body}" unless ignore.include?(response.status.to_i)

    __raise_transport_error response unless ignore.include?(response.status.to_i)
  end

  if response.body && !response.body.empty? && response.headers && response.headers['content-type'] =~ /json/
    json = serializer.load(response.body)
  end
  took = begin
    (json['took'] ? format('%.3fs', json['took'] / 1000.0) : 'n/a')
  rescue StandardError
    'n/a'
  end

  unless ignore.include?(response.status.to_i)
    __log_response method, path, params, body, url, response, json, took, duration
  end

  if tracer
    __trace method, path, params, connection.connection.headers, body, url, response, nil, 'N/A',
            duration
  end

  warnings(response.headers['warning']) if response.headers&.[]('warning')

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end

#reload_connections!Object

Reloads and replaces the connection collection based on cluster information

See Also:



106
107
108
109
110
111
112
113
# File 'lib/opensearch/transport/transport/base.rb', line 106

def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections hosts: hosts, options: options
  self
rescue SnifferTimeoutError
  log_error '[SnifferTimeoutError] Timeout when reloading connections.'
  self
end

#resurrect_dead_connections!Object

Tries to “resurrect” all eligible dead connections



119
120
121
# File 'lib/opensearch/transport/transport/base.rb', line 119

def resurrect_dead_connections!
  connections.dead.each(&:resurrect!)
end