Class: Elasticsearch::Transport::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticsearch/transport/client.rb

Overview

Handles communication with an Elasticsearch cluster.

See README for usage and code examples.

Constant Summary collapse

DEFAULT_TRANSPORT_CLASS =
Transport::HTTP::Faraday
DEFAULT_LOGGER =
lambda do
  require 'logger'
  logger = Logger.new(STDERR)
  logger.progname = 'elasticsearch'
  logger.formatter = proc { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" }
  logger
end
DEFAULT_TRACER =
lambda do
  require 'logger'
  logger = Logger.new(STDERR)
  logger.progname = 'elasticsearch.tracer'
  logger.formatter = proc { |severity, datetime, progname, msg| "#{msg}\n" }
  logger
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(arguments = {}) {|faraday| ... } ⇒ Client

Create a client connected to an Elasticsearch cluster.

Specify the URL via arguments or set the ‘ELASTICSEARCH_URL` environment variable.

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :hosts (String, Array)

    Single host passed as a String or Hash, or multiple hosts passed as an Array; ‘host` or `url` keys are also valid

  • :log (Boolean)

    Use the default logger (disabled by default)

  • :trace (Boolean)

    Use the default tracer (disabled by default)

  • :logger (Object)

    An instance of a Logger-compatible object

  • :tracer (Object)

    An instance of a Logger-compatible object

  • :resurrect_after (Number)

    After how many seconds a dead connection should be tried again

  • :reload_connections (Boolean, Number)

    Reload connections after X requests (false by default)

  • :randomize_hosts (Boolean)

    Shuffle connections on initialization and reload (false by default)

  • :sniffer_timeout (Integer)

    Timeout for reloading connections in seconds (1 by default)

  • :retry_on_failure (Boolean, Number)

    Retry X times when request fails before raising and exception (false by default)

  • Array (Number)

    :retry_on_status Retry when specific status codes are returned

  • :reload_on_failure (Boolean)

    Reload connections after failure (false by default)

  • :request_timeout (Integer)

    The request timeout to be passed to transport in options

  • :adapter (Symbol)

    A specific adapter for Faraday (e.g. ‘:patron`)

  • :transport_options (Hash)

    Options to be passed to the ‘Faraday::Connection` constructor

  • :transport_class (Constant)

    A specific transport class to use, will be initialized by the client and passed hosts and all arguments

  • :transport (Object)

    A specific transport instance

  • :serializer_class (Constant)

    A specific serializer class to use, will be initialized by the transport and passed the transport instance

  • :selector (Constant)

    An instance of selector strategy implemented with Transport::Connections::Selector::Base.

  • :send_get_body_as (String)

    Specify the HTTP method to use for GET requests with a body. (Default: GET)

Yields:

  • (faraday)

    Access and configure the ‘Faraday::Connection` instance directly with a block



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/elasticsearch/transport/client.rb', line 85

def initialize(arguments={}, &block)
  @arguments = arguments

  hosts = @arguments[:hosts] || \
          @arguments[:host]  || \
          @arguments[:url]   || \
          @arguments[:urls]  || \
          ENV.fetch('ELASTICSEARCH_URL', 'localhost:9200')

  @arguments[:logger] ||= @arguments[:log]   ? DEFAULT_LOGGER.call() : nil
  @arguments[:tracer] ||= @arguments[:trace] ? DEFAULT_TRACER.call() : nil
  @arguments[:reload_connections] ||= false
  @arguments[:retry_on_failure]   ||= false
  @arguments[:reload_on_failure]  ||= false
  @arguments[:randomize_hosts]    ||= false
  @arguments[:transport_options]  ||= {}
  @arguments[:http]               ||= {}

  @arguments[:transport_options].update(:request => { :timeout => @arguments[:request_timeout] } ) if @arguments[:request_timeout]

  @arguments[:transport_options][:headers] ||= {}
  @arguments[:transport_options][:headers].update 'Content-Type' => 'application/json' unless @arguments[:transport_options][:headers].keys.any? {|k| k.to_s.downcase =~ /content\-?\_?type/}

  @send_get_body_as = @arguments[:send_get_body_as] || 'GET'

  transport_class  = @arguments[:transport_class] || DEFAULT_TRANSPORT_CLASS

  @transport       = @arguments[:transport] || begin
    if transport_class == Transport::HTTP::Faraday
      transport_class.new(:hosts => __extract_hosts(hosts, @arguments), :options => @arguments) do |faraday|
        block.call faraday if block
        unless (h = faraday.builder.handlers.last) && h.name.start_with?("Faraday::Adapter")
          faraday.adapter(@arguments[:adapter] || __auto_detect_adapter)
        end
      end
    else
      transport_class.new(:hosts => __extract_hosts(hosts, @arguments), :options => @arguments)
    end
  end
end

Instance Attribute Details

#transportObject

Returns the transport object.



32
33
34
# File 'lib/elasticsearch/transport/client.rb', line 32

def transport
  @transport
end

Instance Method Details

#__auto_detect_adapterSymbol

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Auto-detect the best adapter (HTTP “driver”) available, based on libraries loaded by the user, preferring those with persistent connections (“keep-alive”) by default

Returns:

  • (Symbol)


199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/elasticsearch/transport/client.rb', line 199

def __auto_detect_adapter
  case
  when defined?(::Patron)
    :patron
  when defined?(::Typhoeus)
    :typhoeus
  when defined?(::HTTPClient)
    :httpclient
  when defined?(::Net::HTTP::Persistent)
    :net_http_persistent
  else
    ::Faraday.default_adapter
  end
end

#__extract_hosts(hosts_config, options = {}) ⇒ Array<Hash>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Normalizes and returns hosts configuration.

Arrayifies the ‘hosts_config` argument and extracts `host` and `port` info from strings. Performs shuffling when the `randomize_hosts` option is set.

TODO: Refactor, so it’s available in Elasticsearch::Transport::Base as well

Returns:

  • (Array<Hash>)

Raises:

  • (ArgumentError)


146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/elasticsearch/transport/client.rb', line 146

def __extract_hosts(hosts_config, options={})
  if hosts_config.is_a?(Hash)
    hosts = [ hosts_config ]
  else
    if hosts_config.is_a?(String) && hosts_config.include?(',')
      hosts = hosts_config.split(/\s*,\s*/)
    else
      hosts = Array(hosts_config)
    end
  end

  result = hosts.map do |host|
    host_parts = case host
      when String
        if host =~ /^[a-z]+\:\/\//
          uri = URI.parse(host)
          { :scheme => uri.scheme, :user => uri.user, :password => uri.password, :host => uri.host, :path => uri.path, :port => uri.port }
        else
          host, port = host.split(':')
          { :host => host, :port => port }
        end
      when URI
        { :scheme => host.scheme, :user => host.user, :password => host.password, :host => host.host, :path => host.path, :port => host.port }
      when Hash
        host
      else
        raise ArgumentError, "Please pass host as a String, URI or Hash -- #{host.class} given."
      end

    host_parts[:port] = host_parts[:port].to_i unless host_parts[:port].nil?

    # Transfer the selected host parts such as authentication credentials to `options`,
    # so we can re-use them when reloading connections
    #
    host_parts.select { |k,v| [:scheme, :port, :user, :password].include?(k) }.each do |k,v|
      @arguments[:http][k] ||= v
    end

    host_parts
  end

  result.shuffle! if options[:randomize_hosts]
  result
end

#perform_request(method, path, params = {}, body = nil) ⇒ Object

Performs a request through delegation to #transport.



128
129
130
131
132
# File 'lib/elasticsearch/transport/client.rb', line 128

def perform_request(method, path, params={}, body=nil)
  method = @send_get_body_as if 'GET' == method && body

  transport.perform_request method, path, params, body
end