7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/caoutsearch/search/batch/search_after.rb', line 7
def search_after(pit: nil, keep_alive: nil, batch_size: 1000, &block)
if pit
external_pit = true
warn(" A `pit` was passed to batch records without a `keep_alive` argument.\n You may need it to extend the PIT on each request.\n MESSAGE\n end\n\n keep_alive ||= \"1m\"\n pit ||= open_point_in_time(keep_alive: keep_alive)\n search = per(batch_size).track_total_hits\n\n request_payload = {\n body: search.build.to_h.merge(\n pit: {\n id: pit,\n keep_alive: keep_alive\n }\n )\n }\n request_payload[:body][:sort] ||= [:_shard_doc]\n\n total = nil\n progress = 0\n requested_at = nil\n last_response_time = Time.current\n\n loop do\n requested_at = Time.current\n\n results = instrument(:search_after, pit: pit) do |event_payload|\n response = client.search(request_payload)\n last_response_time = Time.current\n\n total ||= response[\"hits\"][\"total\"][\"value\"]\n progress += response[\"hits\"][\"hits\"].size\n\n event_payload[:request] = request_payload\n event_payload[:response] = response\n event_payload[:total] = total\n event_payload[:progress] = progress\n\n response\n rescue Elastic::Transport::Transport::Errors::NotFound => e\n if external_pit && progress.zero?\n raise_enhance_message_on_missing_pit(e)\n else\n raise_enhance_message_on_pit_failure(e, keep_alive, requested_at, last_response_time)\n end\n end\n\n hits = results[\"hits\"][\"hits\"]\n pit = results[\"pit_id\"]\n break if hits.empty?\n\n yield hits\n break if progress >= total\n\n request_payload[:body].tap do |body|\n body[:pit][:id] = pit\n body[:search_after] = hits.last[\"sort\"]\n body.delete(:track_total_hits)\n end\n end\nensure\n close_point_in_time(pit) if pit && !external_pit\nend\n") if keep_alive.nil?
|