Class: Request::Elastic

Inherits:
Object
  • Object
show all
Includes:
Logging, Utils
Defined in:
lib/elastic_manager/request.rb

Overview

Elasticsearch requests wrapper

Constant Summary collapse

RETRY_ERRORS =
[StandardError, RuntimeError, Throttling].freeze

Constants included from Logging

Logging::SEVERITY_COLORS

Instance Method Summary collapse

Methods included from Utils

#already?, #elastic_action_with_log, #fail_and_exit, #index_exist?, #json_parse, #make_index_name, #prechecks, #prepare_vars, #skip_index?, #true?

Methods included from Logging

configure_logger_for, #log, log_level, logger_for

Constructor Details

#initialize(config) ⇒ Elastic

Returns a new instance of Elastic.



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/elastic_manager/request.rb', line 22

def initialize(config)
  @client = HTTP.timeout(
    write:   config['timeout']['write'].to_i,
    connect: config['timeout']['connect'].to_i,
    read:    config['timeout']['read'].to_i
  ).headers(
    'Accept':       'application/json',
    'Content-type': 'application/json'
  )
  @url   = config['es']['url']
  @retry = config['retry'].to_i
  @sleep = config['sleep'].to_i
end

Instance Method Details

#all_indices(from = nil, to = nil, daysago = nil, state = nil, type = nil, config) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/elastic_manager/request.rb', line 122

def all_indices(from=nil, to=nil, daysago=nil, state=nil, type=nil, config)
  indices = get_all_indices

  # TODO: (anton.ryabov) next line just for debug purpose, need better handling
  indices.each { |k, v| log.debug "#{k} - #{v.to_json}" unless v['settings'] }

  indices.select! { |_, v| v['state'] == state } if state
  indices.select! { |_, v| v['settings']['index']['routing']['allocation']['require']['box_type'] == type } if type

  result = []
  indices.each do |index, _|
    begin
      index_date = Date.parse(index.delete('-'))
    rescue ArgumentError => e
      log.error "#{e.message} for #{index}"
      next
    end

    daysago_local = override_daysago(make_index_name(index), config, daysago)

    if from.nil? && index_date < (Date.today - daysago_local)
      result << CGI.escape(index)
    elsif (from..to).cover? index_date
      result << CGI.escape(index)
    end
  end

  result
end

#all_indices_in_snapshots(from = nil, to = nil, daysago = nil, config) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/elastic_manager/request.rb', line 84

def all_indices_in_snapshots(from=nil, to=nil, daysago=nil, config)
  all_snapshots = get_all_snapshots
  all_snapshots.select! { |snap| snap['status'] == 'SUCCESS' }

  result = []
  all_snapshots.each do |snap|
    begin
      snap_date = Date.parse(snap['id'].delete('-'))
    rescue ArgumentError => e
      log.error "#{e.message} for #{index}"
      next
    end

    index = snap['id'].gsub('snapshot_', '')
    daysago_local = override_daysago(make_index_name(index), config, daysago)

    if from.nil? && snap_date < (Date.today - daysago_local)
      result << CGI.escape(index)
    elsif (from..to).cover? snap_date
      result << CGI.escape(index)
    end
  end

  result
end

#chill_index(index, box_type) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/elastic_manager/request.rb', line 300

def chill_index(index, box_type)
  body = {
    'index.routing.allocation.require.box_type' => box_type
  }
  response = request(:put, "/#{index}/_settings?master_timeout=1m", body)

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "can't chill #{index}, response was: #{response.code} - #{response}"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end

#close_index(index, tag) ⇒ Object



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/elastic_manager/request.rb', line 264

def close_index(index, tag)
  box_type = index_box_type(index)

  return false if box_type.nil?

  if box_type == tag
    log.fatal "i will not close index #{index} in box_type #{tag}"
    false
  else
    response = request(:post, "/#{index}/_close?master_timeout=1m")

    if response.code == 200
      response = json_parse(response)
    else
      log.fatal "wrong response code for #{index} close"
      return false
    end

    response['acknowledged'].is_a?(TrueClass)
  end
end

#delete_index(index) ⇒ Object



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/elastic_manager/request.rb', line 316

def delete_index(index)
  snapshot_name = "snapshot_#{index}"
  snapshot_repo = find_snapshot_repo

  return false unless find_snapshot(snapshot_repo, snapshot_name)

  response = request(:delete, "/#{index}")

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "can't delete index #{index}, response was: #{response.code} - #{response}"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end

#delete_snapshot(snapshot, repo) ⇒ Object



385
386
387
388
389
390
391
392
393
394
395
396
# File 'lib/elastic_manager/request.rb', line 385

def delete_snapshot(snapshot, repo)
  response = request(:delete, "/_snapshot/#{repo}/#{snapshot}")

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "can't delete snapshot #{snapshot}, response was: #{response.code} - #{response}"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end

#find_snapshot(repo, snapshot_name) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/elastic_manager/request.rb', line 192

def find_snapshot(repo, snapshot_name)
  response = request(:get, "/_snapshot/#{repo}/#{snapshot_name}/")

  if response.code == 200
    snapshot = json_parse(response)['snapshots'][0]

    if snapshot['state'] == 'SUCCESS'
      snapshot['snapshot']
    else
      log.fatal 'wrong snapshot state'
      exit 1
    end
  else
    log.fatal "can't find snapshot #{snapshot_name} in #{repo} response was: #{response.code} - #{response}"
    exit 1
  end
end

#find_snapshot_repoObject



180
181
182
183
184
185
186
187
188
189
190
# File 'lib/elastic_manager/request.rb', line 180

def find_snapshot_repo
  # TODO: we need improve this if several snapshot repos used in elastic
  response = request(:get, '/_snapshot/')

  if response.code == 200
    json_parse(response).keys.first
  else
    log.fatal "dunno what to do with: #{response.code} - #{response}"
    exit 1
  end
end

#get_all_indicesObject



152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/elastic_manager/request.rb', line 152

def get_all_indices
  req_path   =  '/_cluster/state/metadata/'
  req_params =  '?filter_path=metadata.indices.*.state,'
  req_params += 'metadata.indices.*.settings.index.routing.allocation.require.box_type'

  response = request(:get, req_path + req_params)

  if response.code == 200
    json_parse(response)['metadata']['indices']
  else
    log.fatal "can't work with all_indices response was: #{response.code} - #{response}"
    exit 1
  end
end

#get_all_snapshotsObject



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/elastic_manager/request.rb', line 110

def get_all_snapshots
  snapshot_repo = find_snapshot_repo
  response = request(:get, "/_cat/snapshots/#{snapshot_repo}")

  if response.code == 200
    json_parse(response)
  else
    log.fatal "can't work with all_snapshots response was: #{response.code} - #{response}"
    exit 1
  end
end

#green?Boolean

Returns:

  • (Boolean)


66
67
68
69
70
71
# File 'lib/elastic_manager/request.rb', line 66

def green?
  response = request(:get, '/_cluster/health')
  return json_parse(response)['status'] == 'green' if response.code == 200

  false
end

#index_box_type(index) ⇒ Object



286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/elastic_manager/request.rb', line 286

def index_box_type(index)
  response = request(:get, "/#{index}/_settings/index.routing.allocation.require.box_type")

  if response.code == 200
    response = json_parse(response)
    box_type = response[index]['settings']['index']['routing']['allocation']['require']['box_type']
    log.debug "for #{index} box_type is #{box_type}"
    box_type
  else
    log.fatal "can't check box_type for #{index}, response was: #{response.code} - #{response}"
    nil
  end
end

#open_index(index) ⇒ Object



251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/elastic_manager/request.rb', line 251

def open_index(index)
  response = request(:post, "/#{index}/_open?master_timeout=1m")

  if response.code == 200
    response = json_parse(response)
  else
    log.fatal "wrong response code for #{index} open"
    return false
  end

  response['acknowledged'].is_a?(TrueClass)
end

#override_daysago(index_name, config, daysago) ⇒ Object



73
74
75
76
77
78
79
80
81
82
# File 'lib/elastic_manager/request.rb', line 73

def override_daysago(index_name, config, daysago)
  if config[index_name] &&
     config[index_name]['daysago'] &&
     config[index_name]['daysago'][config['task']] &&
     !config[index_name]['daysago'][config['task']].to_s.empty?
    config[index_name]['daysago'][config['task']].to_i
  else
    daysago.to_i
  end
end

#request(method, url, body = {}) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/elastic_manager/request.rb', line 49

def request(method, url, body={})
  uri = @url + url
  log.debug "uri: #{uri}"

  with_retry do
    response = @client.request(method, uri, json: body)

    if response.code == 503
      raise Request::Throttling.new(response)
    elsif response.status.server_error?
      raise Request::ServerError.new(response)
    end

    response
  end
end

#restore_snapshot(index, box_type) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/elastic_manager/request.rb', line 210

def restore_snapshot(index, box_type)
  snapshot_name = "snapshot_#{index}"
  snapshot_repo = find_snapshot_repo
  snapshot      = find_snapshot(snapshot_repo, snapshot_name)

  body = {
    index_settings: {
      'index.number_of_replicas'                  => 0,
      'index.refresh_interval'                    => -1,
      'index.routing.allocation.require.box_type' => box_type
    }
  }
  response = request(:post, "/_snapshot/#{snapshot_repo}/#{snapshot}/_restore", body)

  if response.code == 200
    sleep 5
    wait_snapshot_restore(index)
  else
    log.fatal "can't restore snapshot #{snapshot_name} response was: #{response.code} - #{response}"
    exit 1
  end
end

#snapshot_exist?(snapshot_name, repo) ⇒ Boolean

Returns:

  • (Boolean)


167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/elastic_manager/request.rb', line 167

def snapshot_exist?(snapshot_name, repo)
  response = request(:get, "/_snapshot/#{repo}/#{snapshot_name}/")

  if response.code == 200
    true
  elsif response.code == 404
    false
  else
    log.fatal "can't check snapshot existing, response was: #{response.code} - #{response}"
    exit 1
  end
end

#snapshot_index(index) ⇒ Object



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/elastic_manager/request.rb', line 363

def snapshot_index(index)
  snapshot_name = "snapshot_#{index}"
  snapshot_repo = find_snapshot_repo

  body = {
    'indices'              => index,
    'ignore_unavailable'   => false,
    'include_global_state' => false,
    'partial'              => false
  }

  response = request(:put, "/_snapshot/#{snapshot_repo}/#{snapshot_name}/", body)

  if response.code == 200
    sleep 5
    wait_snapshot(snapshot_name, snapshot_repo)
  else
    log.error "can't snapshot #{index}, response was: #{response.code} - #{response}"
    false
  end
end

#wait_snapshot(snapshot, repo) ⇒ Object



334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/elastic_manager/request.rb', line 334

def wait_snapshot(snapshot, repo)
  snapshot_ok = false

  until snapshot_ok
    sleep @sleep
    response = request(:get, "/_snapshot/#{repo}/#{snapshot}/_status")

    if response.code == 200
      # TODO: (anton.ryabov) add logging of percent and time ?
      # stats = status['snapshots'][0]['stats']
      # msg = "(#{stats['total_size_in_bytes']/1024/1024/1024}Gb / #{stats['processed_size_in_bytes']/1024/1024/1024}Gb)"
      # puts "Get backup status #{msg}: retry attempt #{attempt_number}; #{total_delay.round} seconds have passed."
      state = json_parse(response)['snapshots'][0]['state']
      log.debug "snapshot check response: #{response.code} - #{response}"

      if state == 'SUCCESS'
        snapshot_ok = true
      elsif %w[FAILED PARTIAL INCOMPATIBLE].include?(state)
        log.fatal "can't snapshot #{snapshot} in #{repo}: #{response.code} - #{response}"
        exit 1
      end
    else
      log.error "can't check snapshot: #{response.code} - #{response}"
    end
  end

  true
end

#wait_snapshot_restore(index) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/elastic_manager/request.rb', line 233

def wait_snapshot_restore(index)
  restore_ok = false

  until restore_ok
    sleep @sleep / 2
    response = request(:get, "/#{index}/_recovery")

    if response.code == 200
      # TODO: (anton.ryabov) add logging of percent and time ?
      restore_ok = json_parse(response)[index]['shards'].map { |s| s['stage'] == 'DONE' }.all?{ |a| a == true }
    else
      log.error "can't check recovery: #{response.code} - #{response}"
    end
  end

  true
end

#with_retryObject



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/elastic_manager/request.rb', line 36

def with_retry
  tries ||= @retry

  yield
rescue *RETRY_ERRORS => e
  log.warn "tries left #{tries + 1} '''#{e.message}''' sleeping #{@sleep} sec..."
  sleep @sleep

  retry unless (tries -= 1).zero?
  log.fatal "backtrace:\n\t#{e.backtrace.join("\n\t")}"
  exit 1
end