Module: Elasticsearch::Transport::Transport::Base Abstract
- Included in:
- HTTP::Curb, HTTP::Faraday, HTTP::Manticore
- Defined in:
- lib/elasticsearch/transport/transport/base.rb
Overview
Module with common functionality for transport implementations.
Constant Summary collapse
- DEFAULT_PORT =
9200
- DEFAULT_PROTOCOL =
'http'
- DEFAULT_RELOAD_AFTER =
Requests
10_000
- DEFAULT_RESURRECT_AFTER =
Seconds
60
- DEFAULT_MAX_RETRIES =
Requests
3
- DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
-
#counter ⇒ Object
readonly
Returns the value of attribute counter.
-
#hosts ⇒ Object
readonly
Returns the value of attribute hosts.
-
#last_request_at ⇒ Object
readonly
Returns the value of attribute last_request_at.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#max_retries ⇒ Object
Returns the value of attribute max_retries.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#protocol ⇒ Object
readonly
Returns the value of attribute protocol.
-
#reload_after ⇒ Object
Returns the value of attribute reload_after.
-
#reload_connections ⇒ Object
Returns the value of attribute reload_connections.
-
#resurrect_after ⇒ Object
Returns the value of attribute resurrect_after.
-
#serializer ⇒ Object
Returns the value of attribute serializer.
-
#sniffer ⇒ Object
Returns the value of attribute sniffer.
-
#tracer ⇒ Object
Returns the value of attribute tracer.
Instance Method Summary collapse
- #__build_connections ⇒ Connections::Collection abstract private
-
#__close_connections ⇒ Object
private
Closes the connections collection.
-
#__convert_to_json(o = nil, options = {}) ⇒ Object
private
Converts any non-String object to JSON.
-
#__full_url(host) ⇒ Object
private
Returns a full URL based on information from host.
-
#__log(method, path, params, body, url, response, json, took, duration) ⇒ Object
private
Log request and response information.
-
#__log_failed(response) ⇒ Object
private
Log failed request.
-
#__raise_transport_error(response) ⇒ Object
private
Raise error specific for the HTTP response status or a generic server error.
-
#__rebuild_connections(arguments = {}) ⇒ Object
private
Replaces the connections collection.
-
#__trace(method, path, params, body, url, response, json, took, duration) ⇒ Object
private
Trace the request in the ‘curl` format.
-
#get_connection(options = {}) ⇒ Connections::Connection
Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.
- #host_unreachable_exceptions ⇒ Array abstract
-
#initialize(arguments = {}, &block) ⇒ Object
Creates a new transport object.
-
#perform_request(method, path, params = {}, body = nil, &block) ⇒ Response
abstract
Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.
-
#reload_connections! ⇒ Object
Reloads and replaces the connection collection based on cluster information.
-
#resurrect_dead_connections! ⇒ Object
Tries to “resurrect” all eligible dead connections.
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def connections @connections end |
#counter ⇒ Object (readonly)
Returns the value of attribute counter.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def counter @counter end |
#hosts ⇒ Object (readonly)
Returns the value of attribute hosts.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def hosts @hosts end |
#last_request_at ⇒ Object (readonly)
Returns the value of attribute last_request_at.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def last_request_at @last_request_at end |
#logger ⇒ Object
Returns the value of attribute logger.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def logger @logger end |
#max_retries ⇒ Object
Returns the value of attribute max_retries.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def max_retries @max_retries end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def @options end |
#protocol ⇒ Object (readonly)
Returns the value of attribute protocol.
15 16 17 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 15 def protocol @protocol end |
#reload_after ⇒ Object
Returns the value of attribute reload_after.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def reload_after @reload_after end |
#reload_connections ⇒ Object
Returns the value of attribute reload_connections.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def reload_connections @reload_connections end |
#resurrect_after ⇒ Object
Returns the value of attribute resurrect_after.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def resurrect_after @resurrect_after end |
#serializer ⇒ Object
Returns the value of attribute serializer.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def serializer @serializer end |
#sniffer ⇒ Object
Returns the value of attribute sniffer.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def sniffer @sniffer end |
#tracer ⇒ Object
Returns the value of attribute tracer.
16 17 18 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 16 def tracer @tracer end |
Instance Method Details
#__build_connections ⇒ Connections::Collection
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A transport implementation must implement this method. See HTTP::Faraday#__build_connections for an example.
284 285 286 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 284 def __build_connections raise NoMethodError, "Implement this method in your class" end |
#__close_connections ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Closes the connections collection.
107 108 109 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 107 def __close_connections # to be implemented by specific transports end |
#__convert_to_json(o = nil, options = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Converts any non-String object to JSON
152 153 154 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 152 def __convert_to_json(o=nil, ={}) o = o.is_a?(String) ? o : serializer.dump(o, ) end |
#__full_url(host) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a full URL based on information from host
161 162 163 164 165 166 167 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 161 def __full_url(host) url = "#{host[:protocol]}://" url += "#{host[:user]}:#{host[:password]}@" if host[:user] url += "#{host[:host]}:#{host[:port]}" url += "#{host[:path]}" if host[:path] url end |
#__log(method, path, params, body, url, response, json, took, duration) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Log request and response information.
115 116 117 118 119 120 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 115 def __log(method, path, params, body, url, response, json, took, duration) logger.info "#{method.to_s.upcase} #{url} " + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" logger.debug "> #{__convert_to_json(body)}" if body logger.debug "< #{response.body}" end |
#__log_failed(response) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Log failed request.
125 126 127 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 125 def __log_failed(response) logger.fatal "[#{response.status}] #{response.body}" end |
#__raise_transport_error(response) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Raise error specific for the HTTP response status or a generic server error
144 145 146 147 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 144 def __raise_transport_error(response) error = ERRORS[response.status] || ServerError raise error.new "[#{response.status}] #{response.body}" end |
#__rebuild_connections(arguments = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Replaces the connections collection.
96 97 98 99 100 101 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 96 def __rebuild_connections(arguments={}) @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} __close_connections @connections = __build_connections end |
#__trace(method, path, params, body, url, response, json, took, duration) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Trace the request in the ‘curl` format.
132 133 134 135 136 137 138 139 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 132 def __trace(method, path, params, body, url, response, json, took, duration) trace_url = "http://localhost:9200/#{path}?pretty" + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' tracer.info "curl -X #{method.to_s.upcase} '#{trace_url}'#{trace_body}\n" tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" end |
#get_connection(options = {}) ⇒ Connections::Connection
Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.
Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.
61 62 63 64 65 66 67 68 69 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 61 def get_connection(={}) resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after connection = connections.get_connection() @counter_mtx.synchronize { @counter += 1 } reload_connections! if reload_connections && counter % reload_after == 0 connection end |
#host_unreachable_exceptions ⇒ Array
Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.
275 276 277 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 275 def host_unreachable_exceptions [Errno::ECONNREFUSED] end |
#initialize(arguments = {}, &block) ⇒ Object
Creates a new transport object.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 30 def initialize(arguments={}, &block) @hosts = arguments[:hosts] || [] @options = arguments[:options] || {} @block = block @connections = __build_connections @serializer = [:serializer] || ( [:serializer_class] ? [:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) ) @protocol = [:protocol] || DEFAULT_PROTOCOL @logger = [:logger] @tracer = [:tracer] @sniffer = [:sniffer_class] ? [:sniffer_class].new(self) : Sniffer.new(self) @counter = 0 @counter_mtx = Mutex.new @last_request_at = Time.now @reload_connections = [:reload_connections] @reload_after = [:reload_connections].is_a?(Fixnum) ? [:reload_connections] : DEFAULT_RELOAD_AFTER @resurrect_after = [:resurrect_after] || DEFAULT_RESURRECT_AFTER @max_retries = [:retry_on_failure].is_a?(Fixnum) ? [:retry_on_failure] : DEFAULT_MAX_RETRIES @retry_on_status = Array([:retry_on_status]).map { |d| d.to_i } end |
#perform_request(method, path, params = {}, body = nil, &block) ⇒ Response
The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.
Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 186 def perform_request(method, path, params={}, body=nil, &block) raise NoMethodError, "Implement this method in your transport class" unless block_given? start = Time.now if logger || tracer tries = 0 begin tries += 1 connection = get_connection or raise Error.new("Cannot get new connection from pool.") if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) params = connection.connection.params.merge(params.to_hash) end url = connection.full_url(path, params) response = block.call(connection, url) connection.healthy! if connection.failures > 0 # Raise an exception so we can catch it for `retry_on_status` __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) rescue Elasticsearch::Transport::Transport::ServerError => e if @retry_on_status.include?(response.status) logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger logger.debug "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger if tries <= max_retries retry else logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger raise e end else raise e end rescue *host_unreachable_exceptions => e logger.error "[#{e.class}] #{e.} #{connection.host.inspect}" if logger connection.dead! if @options[:reload_on_failure] and tries < connections.all.size logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger reload_connections! and retry end if @options[:retry_on_failure] logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger if tries <= max_retries retry else logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger raise e end else raise e end rescue Exception => e logger.fatal "[#{e.class}] #{e.} (#{connection.host.inspect if connection})" if logger raise e end #/begin duration = Time.now-start if logger || tracer if response.status.to_i >= 300 __log method, path, params, body, url, response, nil, 'N/A', duration if logger __trace method, path, params, body, url, response, nil, 'N/A', duration if tracer __log_failed response if logger __raise_transport_error response end json = serializer.load(response.body) if response.headers && response.headers["content-type"] =~ /json/ took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer __log method, path, params, body, url, response, json, took, duration if logger __trace method, path, params, body, url, response, json, took, duration if tracer Response.new response.status, json || response.body, response.headers ensure @last_request_at = Time.now end |
#reload_connections! ⇒ Object
Reloads and replaces the connection collection based on cluster information.
75 76 77 78 79 80 81 82 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 75 def reload_connections! hosts = sniffer.hosts __rebuild_connections :hosts => hosts, :options => self rescue SnifferTimeoutError logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger self end |
#resurrect_dead_connections! ⇒ Object
Tries to “resurrect” all eligible dead connections.
88 89 90 |
# File 'lib/elasticsearch/transport/transport/base.rb', line 88 def resurrect_dead_connections! connections.dead.each { |c| c.resurrect! } end |