7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/caoutsearch/search/batch/scroll.rb', line 7
def scroll(scroll: "1m", batch_size: 1000, &block)
search = per(batch_size).track_total_hits
request_payload = {
index: index_name,
scroll: scroll,
body: search.build.to_h
}
total = 0
progress = 0
requested_at = nil
last_response_time = nil
results = instrument(:scroll_search) do |event_payload|
response = client.search(request_payload)
last_response_time = Time.current
total = response["hits"]["total"]["value"]
progress += response["hits"]["hits"].size
event_payload[:request] = request_payload
event_payload[:response] = response
event_payload[:total] = total
event_payload[:progress] = progress
response
end
hits = results["hits"]["hits"]
return if hits.empty?
yield hits
return if progress >= total
scroll_id = results["_scroll_id"]
request_payload = {
scroll_id: scroll_id,
scroll: scroll
}
loop do
requested_at = Time.current
results = instrument(:scroll, scroll: scroll_id) do |event_payload|
response = client.scroll(request_payload)
last_response_time = Time.current
progress += response["hits"]["hits"].size
event_payload[:request] = request_payload
event_payload[:response] = response
event_payload[:total] = total
event_payload[:progress] = progress
response
rescue Elastic::Transport::Transport::Errors::NotFound => e
raise_enhance_message_when_scroll_failed(e, scroll, requested_at, last_response_time)
end
hits = results["hits"]["hits"]
break if hits.empty?
yield hits
break if progress >= total
end
total
ensure
clear_scroll(scroll_id) if scroll_id
end
|