Module: Elasticsearch::Transport::Transport::Base Abstract

Included in:
HTTP::Curb, HTTP::Faraday, HTTP::Manticore
Defined in:
lib/elasticsearch/transport/transport/base.rb

Overview

This module is abstract.

Module with common functionality for transport implementations.

Constant Summary collapse

DEFAULT_PORT =
9200
DEFAULT_PROTOCOL =
'http'
DEFAULT_RELOAD_AFTER =

Requests

10_000
DEFAULT_RESURRECT_AFTER =

Seconds

60
DEFAULT_MAX_RETRIES =

Requests

3
DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
SANITIZED_PASSWORD =
'*' * (rand(14)+1)

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def connections
  @connections
end

#counterObject (readonly)

Returns the value of attribute counter.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def counter
  @counter
end

#hostsObject (readonly)

Returns the value of attribute hosts.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def hosts
  @hosts
end

#last_request_atObject (readonly)

Returns the value of attribute last_request_at.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def last_request_at
  @last_request_at
end

#loggerObject

Returns the value of attribute logger.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def logger
  @logger
end

#max_retriesObject

Returns the value of attribute max_retries.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def max_retries
  @max_retries
end

#optionsObject (readonly)

Returns the value of attribute options.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def options
  @options
end

#protocolObject (readonly)

Returns the value of attribute protocol.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def protocol
  @protocol
end

#reload_afterObject

Returns the value of attribute reload_after.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def reload_after
  @reload_after
end

#reload_connectionsObject

Returns the value of attribute reload_connections.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def reload_connections
  @reload_connections
end

#resurrect_afterObject

Returns the value of attribute resurrect_after.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def resurrect_after
  @resurrect_after
end

#serializerObject

Returns the value of attribute serializer.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def serializer
  @serializer
end

#snifferObject

Returns the value of attribute sniffer.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def sniffer
  @sniffer
end

#tracerObject

Returns the value of attribute tracer.



17
18
19
# File 'lib/elasticsearch/transport/transport/base.rb', line 17

def tracer
  @tracer
end

Instance Method Details

#__build_connection(host, options = {}, block = nil) ⇒ Connections::Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method is abstract.

Build and return a connection. A transport implementation must implement this method. See HTTP::Faraday#__build_connection for an example.

Returns:

Raises:

  • (NoMethodError)


151
152
153
# File 'lib/elasticsearch/transport/transport/base.rb', line 151

def __build_connection(host, options={}, block=nil)
  raise NoMethodError, "Implement this method in your class"
end

#__build_connectionsConnections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Builds and returns a collection of connections

The adapters have to implement the #__build_connection method.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/elasticsearch/transport/transport/base.rb', line 128

def __build_connections
  Connections::Collection.new \
    :connections => hosts.map { |host|
      host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
      host[:port] ||= options[:port] || options[:http][:scheme] || DEFAULT_PORT
      if (options[:user] || options[:http][:user]) && !host[:user]
        host[:user] ||= options[:user] || options[:http][:user]
        host[:password] ||= options[:password] || options[:http][:password]
      end

      __build_connection(host, (options[:transport_options] || {}), @block)
    },
    :selector_class => options[:selector_class],
    :selector => options[:selector]
end

#__close_connectionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the connections collection



159
160
161
# File 'lib/elasticsearch/transport/transport/base.rb', line 159

def __close_connections
  # A hook point for specific adapters when they need to close connections
end

#__convert_to_json(o = nil, options = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Converts any non-String object to JSON



209
210
211
# File 'lib/elasticsearch/transport/transport/base.rb', line 209

def __convert_to_json(o=nil, options={})
  o = o.is_a?(String) ? o : serializer.dump(o, options)
end

#__full_url(host) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a full URL based on information from host

Parameters:

  • host (Hash)

    Host configuration passed in from Client



218
219
220
221
222
223
224
# File 'lib/elasticsearch/transport/transport/base.rb', line 218

def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user]
  url += "#{host[:host]}:#{host[:port]}"
  url += "#{host[:path]}" if host[:path]
  url
end

#__log(method, path, params, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log request and response information



167
168
169
170
171
172
173
# File 'lib/elasticsearch/transport/transport/base.rb', line 167

def __log(method, path, params, body, url, response, json, took, duration)
  sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD +  '@')
  logger.info  "#{method.to_s.upcase} #{sanitized_url} " +
               "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
  logger.debug "> #{__convert_to_json(body)}" if body
  logger.debug "< #{response.body}"
end

#__log_failed(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log failed request



179
180
181
# File 'lib/elasticsearch/transport/transport/base.rb', line 179

def __log_failed(response)
  logger.fatal "[#{response.status}] #{response.body}"
end

#__raise_transport_error(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raise error specific for the HTTP response status or a generic server error



200
201
202
203
# File 'lib/elasticsearch/transport/transport/base.rb', line 200

def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end

#__rebuild_connections(arguments = {}) ⇒ Connections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rebuilds the connections collection in the transport.

The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/elasticsearch/transport/transport/base.rb', line 104

def __rebuild_connections(arguments={})
  @state_mutex.synchronize do
    @hosts       = arguments[:hosts]    || []
    @options     = arguments[:options]  || {}

    __close_connections

    new_connections = __build_connections
    stale_connections = @connections.all.select  { |c| ! new_connections.include?(c) }
    new_connections = new_connections.reject { |c| @connections.include?(c) }

    @connections.remove(stale_connections)
    @connections.add(new_connections)
    @connections
  end
end

#__trace(method, path, params, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trace the request in the ‘curl` format



187
188
189
190
191
192
193
194
# File 'lib/elasticsearch/transport/transport/base.rb', line 187

def __trace(method, path, params, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
               ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  tracer.info  "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n"
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end

#get_connection(options = {}) ⇒ Connections::Connection

Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.



67
68
69
70
71
72
73
# File 'lib/elasticsearch/transport/transport/base.rb', line 67

def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  @counter_mtx.synchronize { @counter += 1 }
  reload_connections!         if reload_connections && counter % reload_after == 0
  connections.get_connection(options)
end

#host_unreachable_exceptionsArray

This method is abstract.

Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.

Returns:

  • (Array)


338
339
340
# File 'lib/elasticsearch/transport/transport/base.rb', line 338

def host_unreachable_exceptions
  [Errno::ECONNREFUSED]
end

#initialize(arguments = {}, &block) ⇒ Object

Creates a new transport object

Parameters:

  • arguments (Hash) (defaults to: {})

    Settings and options for the transport

  • block (Proc)

    Lambda or Proc which can be evaluated in the context of the “session” object

Options Hash (arguments):

  • :hosts (Array)

    An Array of normalized hosts information

  • :options (Array)

    A Hash with options (usually passed by Client)

See Also:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/elasticsearch/transport/transport/base.rb', line 31

def initialize(arguments={}, &block)
  @state_mutex = Mutex.new

  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @options[:http] ||= {}
  @options[:retry_on_status] ||= []

  @block       = block
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @counter_mtx = Mutex.new
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @max_retries     = options[:retry_on_failure].is_a?(Integer)   ? options[:retry_on_failure]   : DEFAULT_MAX_RETRIES
  @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end

#perform_request(method, path, params = {}, body = nil, &block) ⇒ Response

This method is abstract.

The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

Parameters:

  • method (String)

    Request method

  • path (String)

    The API endpoint

  • params (Hash) (defaults to: {})

    Request parameters (will be serialized by Connections::Connection#full_url)

  • body (Hash) (defaults to: nil)

    Request body (will be serialized by the #serializer)

  • block (Proc)

    Code block to evaluate, passed from the implementation

Returns:

Raises:

  • (NoMethodError)

    If no block is passed

  • (ServerError)

    If request failed on server

  • (Error)

    If no connection is available



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/elasticsearch/transport/transport/base.rb', line 243

def perform_request(method, path, params={}, body=nil, &block)
  raise NoMethodError, "Implement this method in your transport class" unless block_given?
  start = Time.now if logger || tracer
  tries = 0

  params = params.clone

  ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }

  begin
    tries     += 1
    connection = get_connection or raise Error.new("Cannot get new connection from pool.")

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url        = connection.full_url(path, params)

    response   = block.call(connection, url)

    connection.healthy! if connection.failures > 0

    # Raise an exception so we can catch it for `retry_on_status`
    __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)

  rescue Elasticsearch::Transport::Transport::ServerError => e
    if @retry_on_status.include?(response.status)
      logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
      if tries <= max_retries
        retry
      else
        logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
        raise e
      end
    else
      raise e
    end

  rescue *host_unreachable_exceptions => e
    logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger

    connection.dead!

    if @options[:reload_on_failure] and tries < connections.all.size
      logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
      reload_connections! and retry
    end

    if @options[:retry_on_failure]
      logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
      if tries <= max_retries
        retry
      else
        logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
        raise e
      end
    else
      raise e
    end

  rescue Exception => e
    logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger
    raise e

  end #/begin

  duration = Time.now-start if logger || tracer

  if response.status.to_i >= 300
    __log    method, path, params, body, url, response, nil, 'N/A', duration if logger
    __trace  method, path, params, body, url, response, nil, 'N/A', duration if tracer

    # Log the failure only when `ignore` doesn't match the response status
    __log_failed response if logger && !ignore.include?(response.status.to_i)

    __raise_transport_error response unless ignore.include?(response.status.to_i)
  end

  json     = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
  took     = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer

  __log   method, path, params, body, url, response, json, took, duration if logger && !ignore.include?(response.status.to_i)
  __trace method, path, params, body, url, response, json, took, duration if tracer

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end

#reload_connections!Object

Reloads and replaces the connection collection based on cluster information

See Also:



79
80
81
82
83
84
85
86
# File 'lib/elasticsearch/transport/transport/base.rb', line 79

def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections :hosts => hosts, :options => options
  self
rescue SnifferTimeoutError
  logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
  self
end

#resurrect_dead_connections!Object

Tries to “resurrect” all eligible dead connections



92
93
94
# File 'lib/elasticsearch/transport/transport/base.rb', line 92

def resurrect_dead_connections!
  connections.dead.each { |c| c.resurrect! }
end