Class: Request::Elastic
Overview
Elasticsearch requests wrapper
Constant Summary
collapse
- RETRY_ERRORS =
[StandardError, RuntimeError, Throttling].freeze
Constants included
from Logging
Logging::SEVERITY_COLORS
Instance Method Summary
collapse
-
#all_indices(from = nil, to = nil, daysago = nil, state = nil, type = nil, config) ⇒ Object
-
#all_indices_in_snapshots(from = nil, to = nil, daysago = nil, config) ⇒ Object
-
#chill_index(index, box_type) ⇒ Object
-
#close_index(index, tag) ⇒ Object
-
#delete_index(index) ⇒ Object
-
#delete_snapshot(snapshot, repo) ⇒ Object
-
#find_snapshot(repo, snapshot_name) ⇒ Object
-
#find_snapshot_repo ⇒ Object
-
#get_all_indices ⇒ Object
-
#get_all_snapshots ⇒ Object
-
#green? ⇒ Boolean
-
#index_box_type(index) ⇒ Object
-
#initialize(config) ⇒ Elastic
constructor
A new instance of Elastic.
-
#open_index(index) ⇒ Object
-
#override_daysago(index_name, config, daysago) ⇒ Object
-
#request(method, url, body = {}) ⇒ Object
-
#restore_snapshot(index, box_type) ⇒ Object
-
#snapshot_exist?(snapshot_name, repo) ⇒ Boolean
-
#snapshot_index(index) ⇒ Object
-
#wait_snapshot(snapshot, repo) ⇒ Object
-
#wait_snapshot_restore(index) ⇒ Object
-
#with_retry ⇒ Object
Methods included from Utils
#already?, #elastic_action_with_log, #fail_and_exit, #index_exist?, #json_parse, #make_index_name, #prechecks, #prepare_vars, #skip_index?, #true?
Methods included from Logging
configure_logger_for, #log, log_level, logger_for
Constructor Details
#initialize(config) ⇒ Elastic
Returns a new instance of Elastic.
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/elastic_manager/request.rb', line 22
def initialize(config)
@client = HTTP.timeout(
write: config['timeout']['write'].to_i,
connect: config['timeout']['connect'].to_i,
read: config['timeout']['read'].to_i
).(
'Accept': 'application/json',
'Content-type': 'application/json'
)
@url = config['es']['url']
@retry = config['retry'].to_i
@sleep = config['sleep'].to_i
end
|
Instance Method Details
#all_indices(from = nil, to = nil, daysago = nil, state = nil, type = nil, config) ⇒ Object
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# File 'lib/elastic_manager/request.rb', line 122
def all_indices(from=nil, to=nil, daysago=nil, state=nil, type=nil, config)
indices = get_all_indices
indices.each { |k, v| log.debug "#{k} - #{v.to_json}" unless v['settings'] }
indices.select! { |_, v| v['state'] == state } if state
indices.select! { |_, v| v['settings']['index']['routing']['allocation']['require']['box_type'] == type } if type
result = []
indices.each do |index, _|
begin
index_date = Date.parse(index.delete('-'))
rescue ArgumentError => e
log.error "#{e.message} for #{index}"
next
end
daysago_local = override_daysago(make_index_name(index), config, daysago)
if from.nil? && index_date < (Date.today - daysago_local)
result << CGI.escape(index)
elsif (from..to).cover? index_date
result << CGI.escape(index)
end
end
result
end
|
#all_indices_in_snapshots(from = nil, to = nil, daysago = nil, config) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# File 'lib/elastic_manager/request.rb', line 84
def all_indices_in_snapshots(from=nil, to=nil, daysago=nil, config)
all_snapshots = get_all_snapshots
all_snapshots.select! { |snap| snap['status'] == 'SUCCESS' }
result = []
all_snapshots.each do |snap|
begin
snap_date = Date.parse(snap['id'].delete('-'))
rescue ArgumentError => e
log.error "#{e.message} for #{index}"
next
end
index = snap['id'].gsub('snapshot_', '')
daysago_local = override_daysago(make_index_name(index), config, daysago)
if from.nil? && snap_date < (Date.today - daysago_local)
result << CGI.escape(index)
elsif (from..to).cover? snap_date
result << CGI.escape(index)
end
end
result
end
|
#chill_index(index, box_type) ⇒ Object
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
|
# File 'lib/elastic_manager/request.rb', line 300
def chill_index(index, box_type)
body = {
'index.routing.allocation.require.box_type' => box_type
}
response = request(:put, "/#{index}/_settings?master_timeout=1m", body)
if response.code == 200
response = json_parse(response)
else
log.fatal "can't chill #{index}, response was: #{response.code} - #{response}"
return false
end
response['acknowledged'].is_a?(TrueClass)
end
|
#close_index(index, tag) ⇒ Object
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
|
# File 'lib/elastic_manager/request.rb', line 264
def close_index(index, tag)
box_type = index_box_type(index)
return false if box_type.nil?
if box_type == tag
log.fatal "i will not close index #{index} in box_type #{tag}"
false
else
response = request(:post, "/#{index}/_close?master_timeout=1m")
if response.code == 200
response = json_parse(response)
else
log.fatal "wrong response code for #{index} close"
return false
end
response['acknowledged'].is_a?(TrueClass)
end
end
|
#delete_index(index) ⇒ Object
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
|
# File 'lib/elastic_manager/request.rb', line 316
def delete_index(index)
snapshot_name = "snapshot_#{index}"
snapshot_repo = find_snapshot_repo
return false unless find_snapshot(snapshot_repo, snapshot_name)
response = request(:delete, "/#{index}")
if response.code == 200
response = json_parse(response)
else
log.fatal "can't delete index #{index}, response was: #{response.code} - #{response}"
return false
end
response['acknowledged'].is_a?(TrueClass)
end
|
#delete_snapshot(snapshot, repo) ⇒ Object
385
386
387
388
389
390
391
392
393
394
395
396
|
# File 'lib/elastic_manager/request.rb', line 385
def delete_snapshot(snapshot, repo)
response = request(:delete, "/_snapshot/#{repo}/#{snapshot}")
if response.code == 200
response = json_parse(response)
else
log.fatal "can't delete snapshot #{snapshot}, response was: #{response.code} - #{response}"
return false
end
response['acknowledged'].is_a?(TrueClass)
end
|
#find_snapshot(repo, snapshot_name) ⇒ Object
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
|
# File 'lib/elastic_manager/request.rb', line 192
def find_snapshot(repo, snapshot_name)
response = request(:get, "/_snapshot/#{repo}/#{snapshot_name}/")
if response.code == 200
snapshot = json_parse(response)['snapshots'][0]
if snapshot['state'] == 'SUCCESS'
snapshot['snapshot']
else
log.fatal 'wrong snapshot state'
exit 1
end
else
log.fatal "can't find snapshot #{snapshot_name} in #{repo} response was: #{response.code} - #{response}"
exit 1
end
end
|
#find_snapshot_repo ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
|
# File 'lib/elastic_manager/request.rb', line 180
def find_snapshot_repo
response = request(:get, '/_snapshot/')
if response.code == 200
json_parse(response).keys.first
else
log.fatal "dunno what to do with: #{response.code} - #{response}"
exit 1
end
end
|
#get_all_indices ⇒ Object
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
# File 'lib/elastic_manager/request.rb', line 152
def get_all_indices
req_path = '/_cluster/state/metadata/'
req_params = '?filter_path=metadata.indices.*.state,'
req_params += 'metadata.indices.*.settings.index.routing.allocation.require.box_type'
response = request(:get, req_path + req_params)
if response.code == 200
json_parse(response)['metadata']['indices']
else
log.fatal "can't work with all_indices response was: #{response.code} - #{response}"
exit 1
end
end
|
#get_all_snapshots ⇒ Object
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/elastic_manager/request.rb', line 110
def get_all_snapshots
snapshot_repo = find_snapshot_repo
response = request(:get, "/_cat/snapshots/#{snapshot_repo}")
if response.code == 200
json_parse(response)
else
log.fatal "can't work with all_snapshots response was: #{response.code} - #{response}"
exit 1
end
end
|
#green? ⇒ Boolean
66
67
68
69
70
71
|
# File 'lib/elastic_manager/request.rb', line 66
def green?
response = request(:get, '/_cluster/health')
return json_parse(response)['status'] == 'green' if response.code == 200
false
end
|
#index_box_type(index) ⇒ Object
286
287
288
289
290
291
292
293
294
295
296
297
298
|
# File 'lib/elastic_manager/request.rb', line 286
def index_box_type(index)
response = request(:get, "/#{index}/_settings/index.routing.allocation.require.box_type")
if response.code == 200
response = json_parse(response)
box_type = response[index]['settings']['index']['routing']['allocation']['require']['box_type']
log.debug "for #{index} box_type is #{box_type}"
box_type
else
log.fatal "can't check box_type for #{index}, response was: #{response.code} - #{response}"
nil
end
end
|
#open_index(index) ⇒ Object
251
252
253
254
255
256
257
258
259
260
261
262
|
# File 'lib/elastic_manager/request.rb', line 251
def open_index(index)
response = request(:post, "/#{index}/_open?master_timeout=1m")
if response.code == 200
response = json_parse(response)
else
log.fatal "wrong response code for #{index} open"
return false
end
response['acknowledged'].is_a?(TrueClass)
end
|
#override_daysago(index_name, config, daysago) ⇒ Object
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/elastic_manager/request.rb', line 73
def override_daysago(index_name, config, daysago)
if config[index_name] &&
config[index_name]['daysago'] &&
config[index_name]['daysago'][config['task']] &&
!config[index_name]['daysago'][config['task']].to_s.empty?
config[index_name]['daysago'][config['task']].to_i
else
daysago.to_i
end
end
|
#request(method, url, body = {}) ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/elastic_manager/request.rb', line 49
def request(method, url, body={})
uri = @url + url
log.debug "uri: #{uri}"
with_retry do
response = @client.request(method, uri, json: body)
if response.code == 503
raise Request::Throttling.new(response)
elsif response.status.server_error?
raise Request::ServerError.new(response)
end
response
end
end
|
#restore_snapshot(index, box_type) ⇒ Object
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
|
# File 'lib/elastic_manager/request.rb', line 210
def restore_snapshot(index, box_type)
snapshot_name = "snapshot_#{index}"
snapshot_repo = find_snapshot_repo
snapshot = find_snapshot(snapshot_repo, snapshot_name)
body = {
index_settings: {
'index.number_of_replicas' => 0,
'index.refresh_interval' => -1,
'index.routing.allocation.require.box_type' => box_type
}
}
response = request(:post, "/_snapshot/#{snapshot_repo}/#{snapshot}/_restore", body)
if response.code == 200
sleep 5
wait_snapshot_restore(index)
else
log.fatal "can't restore snapshot #{snapshot_name} response was: #{response.code} - #{response}"
exit 1
end
end
|
#snapshot_exist?(snapshot_name, repo) ⇒ Boolean
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/elastic_manager/request.rb', line 167
def snapshot_exist?(snapshot_name, repo)
response = request(:get, "/_snapshot/#{repo}/#{snapshot_name}/")
if response.code == 200
true
elsif response.code == 404
false
else
log.fatal "can't check snapshot existing, response was: #{response.code} - #{response}"
exit 1
end
end
|
#snapshot_index(index) ⇒ Object
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
|
# File 'lib/elastic_manager/request.rb', line 363
def snapshot_index(index)
snapshot_name = "snapshot_#{index}"
snapshot_repo = find_snapshot_repo
body = {
'indices' => index,
'ignore_unavailable' => false,
'include_global_state' => false,
'partial' => false
}
response = request(:put, "/_snapshot/#{snapshot_repo}/#{snapshot_name}/", body)
if response.code == 200
sleep 5
wait_snapshot(snapshot_name, snapshot_repo)
else
log.error "can't snapshot #{index}, response was: #{response.code} - #{response}"
false
end
end
|
#wait_snapshot(snapshot, repo) ⇒ Object
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
|
# File 'lib/elastic_manager/request.rb', line 334
def wait_snapshot(snapshot, repo)
snapshot_ok = false
until snapshot_ok
sleep @sleep
response = request(:get, "/_snapshot/#{repo}/#{snapshot}/_status")
if response.code == 200
state = json_parse(response)['snapshots'][0]['state']
log.debug "snapshot check response: #{response.code} - #{response}"
if state == 'SUCCESS'
snapshot_ok = true
elsif %w[FAILED PARTIAL INCOMPATIBLE].include?(state)
log.fatal "can't snapshot #{snapshot} in #{repo}: #{response.code} - #{response}"
exit 1
end
else
log.error "can't check snapshot: #{response.code} - #{response}"
end
end
true
end
|
#wait_snapshot_restore(index) ⇒ Object
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
|
# File 'lib/elastic_manager/request.rb', line 233
def wait_snapshot_restore(index)
restore_ok = false
until restore_ok
sleep @sleep / 2
response = request(:get, "/#{index}/_recovery")
if response.code == 200
restore_ok = json_parse(response)[index]['shards'].map { |s| s['stage'] == 'DONE' }.all?{ |a| a == true }
else
log.error "can't check recovery: #{response.code} - #{response}"
end
end
true
end
|
#with_retry ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/elastic_manager/request.rb', line 36
def with_retry
tries ||= @retry
yield
rescue *RETRY_ERRORS => e
log.warn "tries left #{tries + 1} '''#{e.message}''' sleeping #{@sleep} sec..."
sleep @sleep
retry unless (tries -= 1).zero?
log.fatal "backtrace:\n\t#{e.backtrace.join("\n\t")}"
exit 1
end
|