Module: Caoutsearch::Search::Batch::SearchAfter

Included in:
Caoutsearch::Search::Base
Defined in:
lib/caoutsearch/search/batch/search_after.rb

Instance Method Summary collapse

Instance Method Details

#search_after(pit: nil, keep_alive: nil, batch_size: 1000, &block) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/caoutsearch/search/batch/search_after.rb', line 7

def search_after(pit: nil, keep_alive: nil, batch_size: 1000, &block)
  if pit
    external_pit = true

    warn("      A `pit` was passed to batch records without a `keep_alive` argument.\n      You may need it to extend the PIT on each request.\n    MESSAGE\n  end\n\n  keep_alive ||= \"1m\"\n  pit ||= open_point_in_time(keep_alive: keep_alive)\n  search = per(batch_size).track_total_hits\n\n  request_payload = {\n    body: search.build.to_h.merge(\n      pit: {\n        id: pit,\n        keep_alive: keep_alive\n      }\n    )\n  }\n  request_payload[:body][:sort] ||= [:_shard_doc]\n\n  total = nil\n  progress = 0\n  requested_at = nil\n  last_response_time = Time.current\n\n  loop do\n    requested_at = Time.current\n\n    results = instrument(:search_after, pit: pit) do |event_payload|\n      response = client.search(request_payload)\n      last_response_time = Time.current\n\n      total ||= response[\"hits\"][\"total\"][\"value\"]\n      progress += response[\"hits\"][\"hits\"].size\n\n      event_payload[:request] = request_payload\n      event_payload[:response] = response\n      event_payload[:total] = total\n      event_payload[:progress] = progress\n\n      response\n    rescue Elastic::Transport::Transport::Errors::NotFound => e\n      if external_pit && progress.zero?\n        raise_enhance_message_on_missing_pit(e)\n      else\n        raise_enhance_message_on_pit_failure(e, keep_alive, requested_at, last_response_time)\n      end\n    end\n\n    hits = results[\"hits\"][\"hits\"]\n    pit = results[\"pit_id\"]\n    break if hits.empty?\n\n    yield hits\n    break if progress >= total\n\n    request_payload[:body].tap do |body|\n      body[:pit][:id] = pit\n      body[:search_after] = hits.last[\"sort\"]\n      body.delete(:track_total_hits)\n    end\n  end\nensure\n  close_point_in_time(pit) if pit && !external_pit\nend\n") if keep_alive.nil?