Class: Stretcher::Server

Inherits:
EsComponent show all
Defined in:
lib/stretcher/server.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from EsComponent

#do_alias, #do_delete_query, #do_refresh, #do_search

Constructor Details

#initialize(uri = 'http://localhost:9200', options = {}) ⇒ Server

Represents a Server context in elastic search. Use with_server when you want to use the block syntax. The options hash takes an optional instance of Logger under :logger.

server = Stretcher::Server.new('http://localhost:9200')

The default implementation here uses the net_http_persistent adapter for faraday. If you would like to use a different HTTP library, or alter other faraday config settings you may specify an optional :faraday_configurator argument, with a Proc as a value. This will be called once with the faraday builder.

For instance: configurator = proc {|builder| builder.adapter :typhoeus Stretcher::Server.new('localhost:9200', :faraday_configurator => configurator)

You may want to set one or both of :read_timeout and :open_timeout, for Faraday's HTTP settings. The default is read_timeout: 30, open_timeout: 2, which can be quite long in many situations

When running inside a multi-threaded server such as EventMachine, and using a threadsafe HTTP library such as faraday, the mutex synchronization around API calls to the server can be avoided. In this case, set the option http_threadsafe: true


83
84
85
86
87
88
89
90
# File 'lib/stretcher/server.rb', line 83

def initialize(uri='http://localhost:9200', options={})      
  @http_threadsafe = !!options[:http_threadsafe]
  @request_mtx = Mutex.new unless @http_threadsafe
  @uri = uri.to_s
  @uri_components = URI.parse(@uri)
  @http = self.class.build_client(@uri_components, options)
  @logger = self.class.build_logger(options)
end

Instance Attribute Details

#httpObject (readonly)

Returns the value of attribute http


3
4
5
# File 'lib/stretcher/server.rb', line 3

def http
  @http
end

#loggerObject (readonly)

Returns the value of attribute logger


3
4
5
# File 'lib/stretcher/server.rb', line 3

def logger
  @logger
end

#uriObject (readonly)

Returns the value of attribute uri


3
4
5
# File 'lib/stretcher/server.rb', line 3

def uri
  @uri
end

Class Method Details

.build_client(uri_components, options = {}) ⇒ Object

Internal use only. Returns a properly configured HTTP client when initializing an instance


7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/stretcher/server.rb', line 7

def self.build_client(uri_components, options={})
  http = Faraday.new(:url => uri_components) do |builder|
    builder.response :json, :content_type => /\bjson$/

    builder.request :json

    builder.options[:timeout] = options[:read_timeout] || 30
    builder.options[:open_timeout] = options[:open_timeout] || 2

    if faraday_configurator = options[:faraday_configurator]
      faraday_configurator.call(builder)
    else
      builder.adapter :excon
    end
  end
  http.headers = {
    :accept =>  'application/json',
    :user_agent => "Stretcher Ruby Gem #{Stretcher::VERSION}",
    "Content-Type" => "application/json"
  }

  if uri_components.user || uri_components.password
    http.basic_auth(uri_components.user, uri_components.password)
  end

  http
end

.build_logger(options) ⇒ Object

Internal use only. Builds a logger when initializing an instance


37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/stretcher/server.rb', line 37

def self.build_logger(options)
  logger = options[:logger] || Logger.new(STDOUT)

  # We don't want to override the formatter if an external logger is used
  if !options[:logger]
    log_level = options[:log_level] || :warn
    logger.level = Logger.const_get(log_level.to_s.upcase)
    logger.formatter = proc do |severity, datetime, progname, msg|
      "[Stretcher][#{severity}]: #{msg}\n"
    end
  end
  
  logger
end

.with_server(*args) {|s| ... } ⇒ Object

Instantiate a new instance in a manner convenient for using the block syntax. Can be used interchangably with Stretcher::Server.new but will return the value of the block if present. See the regular constructor for full options.

Yields:

  • (s)

55
56
57
58
# File 'lib/stretcher/server.rb', line 55

def self.with_server(*args)
  s = self.new(*args)
  yield s
end

Instance Method Details

#aliases(body = nil) ⇒ Object

Implements the Aliases API Ex: server.aliases({add: {index: :my_index, alias: :my_alias}}) as per: www.elasticsearch.org/guide/reference/api/admin-indices-aliases.html


196
197
198
199
200
201
202
# File 'lib/stretcher/server.rb', line 196

def aliases(body=nil)
  if body
    request(:post, path_uri("/_aliases"), {}, body)
  else
    request(:get, path_uri("/_aliases"))
  end
end

#analyze(text, analysis_params) ⇒ Object

Implements the Analyze API Ex:

server.analyze("Candles", analyzer: :snowball)
# => #<Hashie::Mash tokens=[#<Hashie::Mash end_offset=7 position=1 start_offset=0 token="candl" type="<ALPHANUM>">]>

as per: www.elasticsearch.org/guide/reference/api/admin-indices-analyze.html


186
187
188
189
190
# File 'lib/stretcher/server.rb', line 186

def analyze(text, analysis_params)
  request(:get, path_uri("/_analyze"), analysis_params) do |req|
    req.body = text
  end
end

#bulk(data, options = {}) ⇒ Object

Perform a raw bulk operation. You probably want to use Stretcher::Index#bulk_index which properly formats a bulk index request.


113
114
115
# File 'lib/stretcher/server.rb', line 113

def bulk(data, options={})
  request(:post, path_uri("/_bulk"), options, data)
end

#cluster(&block) ⇒ Object

Returns the Stretcher::Cluster for this server Optionally takes a block, which will be passed a single arg with the Cluster object. The block returns the evaluated value of the block.


106
107
108
109
# File 'lib/stretcher/server.rb', line 106

def cluster(&block)
  cluster = Cluster.new(self, :logger => logger)
  block ? block.call(cluster) : cluster
end

#get_alias(alias_name_or_prefix) ⇒ Object


204
205
206
# File 'lib/stretcher/server.rb', line 204

def get_alias(alias_name_or_prefix)
  do_alias(alias_name_or_prefix)
end

#index(name, &block) ⇒ Object

Returns a Stretcher::Index object for the index name. Optionally takes a block, which will be passed a single arg with the Index obj The block syntax returns the evaluated value of the block

my_server.index(:foo) # => #<Stretcher::Index ...>
my_server.index(:foo) {|idx| 1+1} # => 2

98
99
100
101
# File 'lib/stretcher/server.rb', line 98

def index(name, &block)
  idx = Index.new(self, name, :logger => logger)
  block ? block.call(idx) : idx
end

#legacy_mget(body) ⇒ Object

legacy implementation of mget for backwards compat


177
178
179
# File 'lib/stretcher/server.rb', line 177

def legacy_mget(body)
  request(:get, path_uri("/_mget"), {}, body)
end

#mget(docs = [], arg_opts = {}) ⇒ Object

Retrieves multiple documents, possibly from multiple indexes Takes an array of docs, returns an array of docs as per: www.elasticsearch.org/guide/reference/api/multi-get.html If you pass :found => true as the second argument it will not return stubs for missing documents. :exists => false works in reverse


162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/stretcher/server.rb', line 162

def mget(docs=[], arg_opts={})
  #Legacy API
  return legacy_mget(docs) if docs.is_a?(Hash)
  
  opts = {:exists => true}.merge(arg_opts)
  
  res = request(:get, path_uri("/_mget"), {}, {:docs => docs})[:docs]
  if opts.has_key?(:exists) || opts.has_key?(:found)
    res.select {|d| d[:found] == true}
  else
    res
  end
end

#msearch(body = []) ⇒ Object

Takes an array of msearch data as per www.elasticsearch.org/guide/reference/api/multi-search.html Should look something like:

data = [
  {"index" : "test"}
  {"query" : {"match_all" : {}}, "from" : 0, "size" : 10}
  {"index" : "test", "search_type" : "count"}
  {"query" : {"match_all" : {}}}
]
server.msearch(data)

Raises:

  • (ArgumentError)

143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/stretcher/server.rb', line 143

def msearch(body=[])
  raise ArgumentError, "msearch takes an array!" unless body.is_a?(Array)
  fmt_body = body.map {|l| MultiJson.dump(l) }.join("\n") << "\n"

  res = request(:get, path_uri("/_msearch"), {}, fmt_body, {}, :mashify => false)

  errors = res['responses'].map { |response| response['error'] }.compact
  if !errors.empty?
    raise RequestError.new(res), "Could not msearch #{errors.inspect}"
  end

  res['responses'].map {|r| SearchResults.new(r)}
end

#path_uri(path = nil) ⇒ Object

Full path to the server root dir


214
215
216
# File 'lib/stretcher/server.rb', line 214

def path_uri(path=nil)
  URI.join(@uri.to_s, "#{@uri_components.path}/#{path.to_s}").to_s
end

#refreshObject

Perform a refresh, making all indexed documents available


209
210
211
# File 'lib/stretcher/server.rb', line 209

def refresh
  do_refresh
end

#request(method, path, params = {}, body = nil, headers = {}, options = {}, &block) ⇒ Object

Handy way to query the server, returning only the body Will raise an exception when the status is not in the 2xx range


220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/stretcher/server.rb', line 220

def request(method, path, params={}, body=nil, headers={}, options={}, &block)
  options = { :mashify => true }.merge(options)
  req = http.build_request(method)
  req.path = path
  req.params.update(Util.clean_params(params)) if params
  req.body = body
  req.headers.update(headers) if headers
  block.call(req) if block
  logger.debug { Util.curl_format(req) }

  if @http_threadsafe
    env = req.to_env(http)
    check_response(http.app.call(env), options)
  else
    @request_mtx.synchronize {
      env = req.to_env(http)
      check_response(http.app.call(env), options)
    }
  end
end

#statsObject

Retrieves stats for this server


118
119
120
# File 'lib/stretcher/server.rb', line 118

def stats
  request :get, path_uri("/_stats")
end

#statusObject

Retrieves status for this server


123
124
125
# File 'lib/stretcher/server.rb', line 123

def status
  request :get, path_uri("/_status")
end

#up?Boolean

Returns true if the server is currently reachable, raises an error otherwise

Returns:

  • (Boolean)

128
129
130
131
# File 'lib/stretcher/server.rb', line 128

def up?
  request(:get, path_uri)
  true
end