Module: Elasticsearch::Transport::Transport::Base Abstract

Included in:
HTTP::Curb, HTTP::Faraday, HTTP::Manticore
Defined in:
lib/elasticsearch/transport/transport/base.rb

Overview

This module is abstract.

Module with common functionality for transport implementations.

Constant Summary collapse

DEFAULT_PORT =
9200
DEFAULT_PROTOCOL =
'http'
DEFAULT_RELOAD_AFTER =

Requests

10_000
DEFAULT_RESURRECT_AFTER =

Seconds

60
DEFAULT_MAX_RETRIES =

Requests

3
DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



15
16
17
# File 'lib/elasticsearch/transport/transport/base.rb', line 15

def connections
  @connections
end

#counterObject (readonly)

Returns the value of attribute counter.



15
16
17
# File 'lib/elasticsearch/transport/transport/base.rb', line 15

def counter
  @counter
end

#hostsObject (readonly)

Returns the value of attribute hosts.



15
16
17
# File 'lib/elasticsearch/transport/transport/base.rb', line 15

def hosts
  @hosts
end

#last_request_atObject (readonly)

Returns the value of attribute last_request_at.



15
16
17
# File 'lib/elasticsearch/transport/transport/base.rb', line 15

def last_request_at
  @last_request_at
end

#loggerObject

Returns the value of attribute logger.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def logger
  @logger
end

#max_retriesObject

Returns the value of attribute max_retries.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def max_retries
  @max_retries
end

#optionsObject (readonly)

Returns the value of attribute options.



15
16
17
# File 'lib/elasticsearch/transport/transport/base.rb', line 15

def options
  @options
end

#protocolObject (readonly)

Returns the value of attribute protocol.



15
16
17
# File 'lib/elasticsearch/transport/transport/base.rb', line 15

def protocol
  @protocol
end

#reload_afterObject

Returns the value of attribute reload_after.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def reload_after
  @reload_after
end

#reload_connectionsObject

Returns the value of attribute reload_connections.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def reload_connections
  @reload_connections
end

#resurrect_afterObject

Returns the value of attribute resurrect_after.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def resurrect_after
  @resurrect_after
end

#serializerObject

Returns the value of attribute serializer.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def serializer
  @serializer
end

#snifferObject

Returns the value of attribute sniffer.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def sniffer
  @sniffer
end

#tracerObject

Returns the value of attribute tracer.



16
17
18
# File 'lib/elasticsearch/transport/transport/base.rb', line 16

def tracer
  @tracer
end

Instance Method Details

#__build_connectionsConnections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method is abstract.

A transport implementation must implement this method. See HTTP::Faraday#__build_connections for an example.

Returns:

Raises:

  • (NoMethodError)


255
256
257
# File 'lib/elasticsearch/transport/transport/base.rb', line 255

def __build_connections
  raise NoMethodError, "Implement this method in your class"
end

#__convert_to_json(o = nil, options = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Converts any non-String object to JSON



141
142
143
# File 'lib/elasticsearch/transport/transport/base.rb', line 141

def __convert_to_json(o=nil, options={})
  o = o.is_a?(String) ? o : serializer.dump(o, options)
end

#__full_url(host) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a full URL based on information from host

Parameters:

  • host (Hash)

    Host configuration passed in from Client



150
151
152
153
154
155
156
# File 'lib/elasticsearch/transport/transport/base.rb', line 150

def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{host[:user]}:#{host[:password]}@" if host[:user]
  url += "#{host[:host]}:#{host[:port]}"
  url += "#{host[:path]}" if host[:path]
  url
end

#__log(method, path, params, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log request and response information.



104
105
106
107
108
109
# File 'lib/elasticsearch/transport/transport/base.rb', line 104

def __log(method, path, params, body, url, response, json, took, duration)
  logger.info  "#{method.to_s.upcase} #{url} " +
               "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
  logger.debug "> #{__convert_to_json(body)}" if body
  logger.debug "< #{response.body}"
end

#__log_failed(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log failed request.



114
115
116
# File 'lib/elasticsearch/transport/transport/base.rb', line 114

def __log_failed(response)
  logger.fatal "[#{response.status}] #{response.body}"
end

#__raise_transport_error(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raise error specific for the HTTP response status or a generic server error



133
134
135
136
# File 'lib/elasticsearch/transport/transport/base.rb', line 133

def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end

#__rebuild_connections(arguments = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Replaces the connections collection.



94
95
96
97
98
# File 'lib/elasticsearch/transport/transport/base.rb', line 94

def __rebuild_connections(arguments={})
  @hosts       = arguments[:hosts]    || []
  @options     = arguments[:options]  || {}
  @connections = __build_connections
end

#__trace(method, path, params, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trace the request in the ‘curl` format.



121
122
123
124
125
126
127
128
# File 'lib/elasticsearch/transport/transport/base.rb', line 121

def __trace(method, path, params, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
               ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  tracer.info  "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n"
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end

#get_connection(options = {}) ⇒ Connections::Connection

Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.



59
60
61
62
63
64
65
66
67
# File 'lib/elasticsearch/transport/transport/base.rb', line 59

def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  connection = connections.get_connection(options)
  @counter  += 1

  reload_connections!         if reload_connections && counter % reload_after == 0
  connection
end

#host_unreachable_exceptionsArray

This method is abstract.

Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.

Returns:

  • (Array)


246
247
248
# File 'lib/elasticsearch/transport/transport/base.rb', line 246

def host_unreachable_exceptions
  [Errno::ECONNREFUSED, Elasticsearch::Transport::Transport::Errors::BadGateway]
end

#initialize(arguments = {}, &block) ⇒ Object

Creates a new transport object.

Parameters:

  • arguments (Hash) (defaults to: {})

    Settings and options for the transport

  • block (Proc)

    Lambda or Proc which can be evaluated in the context of the “session” object

Options Hash (arguments):

  • :hosts (Array)

    An Array of normalized hosts information

  • :options (Array)

    A Hash with options (usually passed by Client)

See Also:



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/elasticsearch/transport/transport/base.rb', line 30

def initialize(arguments={}, &block)
  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @block       = block
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Fixnum) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @max_retries     = options[:retry_on_failure].is_a?(Fixnum)   ? options[:retry_on_failure]   : DEFAULT_MAX_RETRIES
end

#perform_request(method, path, params = {}, body = nil, &block) ⇒ Response

This method is abstract.

The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

Parameters:

  • method (String)

    Request method

  • path (String)

    The API endpoint

  • params (Hash) (defaults to: {})

    Request parameters (will be serialized by Connections::Connection#full_url)

  • body (Hash) (defaults to: nil)

    Request body (will be serialized by the #serializer)

  • block (Proc)

    Code block to evaluate, passed from the implementation

Returns:

Raises:

  • (NoMethodError)

    If no block is passed

  • (ServerError)

    If request failed on server

  • (Error)

    If no connection is available



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/elasticsearch/transport/transport/base.rb', line 175

def perform_request(method, path, params={}, body=nil, &block)
  raise NoMethodError, "Implement this method in your transport class" unless block_given?
  start = Time.now if logger || tracer
  tries = 0

  begin
    tries     += 1
    connection = get_connection or raise Error.new("Cannot get new connection from pool.")

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url        = connection.full_url(path, params)

    response   = block.call(connection, url)

    connection.healthy! if connection.failures > 0

  rescue *host_unreachable_exceptions => e
    logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger

    connection.dead!

    if @options[:reload_on_failure] and tries < connections.all.size
      logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
      reload_connections! and retry
    end

    if @options[:retry_on_failure]
      logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
      if tries <= max_retries
        retry
      else
        logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
        raise e
      end
    else
      raise e
    end

  rescue Exception => e
    logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger
    raise e
  end

  duration = Time.now-start if logger || tracer

  if response.status.to_i >= 300
    __log    method, path, params, body, url, response, nil, 'N/A', duration if logger
    __trace  method, path, params, body, url, response, nil, 'N/A', duration if tracer
    __log_failed response if logger
    __raise_transport_error response
  end

  json     = serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/
  took     = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer

  __log   method, path, params, body, url, response, json, took, duration if logger
  __trace method, path, params, body, url, response, json, took, duration if tracer

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end

#reload_connections!Object

Reloads and replaces the connection collection based on cluster information.

See Also:



73
74
75
76
77
78
79
80
# File 'lib/elasticsearch/transport/transport/base.rb', line 73

def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections :hosts => hosts, :options => options
  self
rescue SnifferTimeoutError
  logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
  self
end

#resurrect_dead_connections!Object

Tries to “resurrect” all eligible dead connections.



86
87
88
# File 'lib/elasticsearch/transport/transport/base.rb', line 86

def resurrect_dead_connections!
  connections.dead.each { |c| c.resurrect! }
end