Module: Caoutsearch::Search::Batch::Scroll

Included in:
Caoutsearch::Search::Base
Defined in:
lib/caoutsearch/search/batch/scroll.rb

Instance Method Summary collapse

Instance Method Details

#clear_scroll(scroll_id) ⇒ Object



79
80
81
82
83
# File 'lib/caoutsearch/search/batch/scroll.rb', line 79

def clear_scroll(scroll_id)
  client.clear_scroll(scroll_id: scroll_id)
rescue ::Elastic::Transport::Transport::Errors::NotFound
  # We dont care if the scroll ID is already expired
end

#raise_enhance_message_when_scroll_failed(error, scroll, requested_at, last_response_time) ⇒ Object



85
86
87
88
89
# File 'lib/caoutsearch/search/batch/scroll.rb', line 85

def raise_enhance_message_when_scroll_failed(error, scroll, requested_at, last_response_time)
  elapsed = (requested_at - last_response_time).round(1).seconds

  raise error.exception("Scroll registered for #{scroll}, #{elapsed.inspect} elapsed between. #{error.message}")
end

#scroll(scroll: "1m", batch_size: 1000, &block) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/caoutsearch/search/batch/scroll.rb', line 7

def scroll(scroll: "1m", batch_size: 1000, &block)
  search = per(batch_size).track_total_hits

  request_payload = {
    index: index_name,
    scroll: scroll,
    body: search.build.to_h
  }

  total = 0
  progress = 0
  requested_at = nil
  last_response_time = nil

  results = instrument(:scroll_search) do |event_payload|
    response = client.search(request_payload)
    last_response_time = Time.current

    total = response["hits"]["total"]["value"]
    progress += response["hits"]["hits"].size

    event_payload[:request] = request_payload
    event_payload[:response] = response
    event_payload[:total] = total
    event_payload[:progress] = progress

    response
  end

  hits = results["hits"]["hits"]
  return if hits.empty?

  yield hits
  return if progress >= total

  scroll_id = results["_scroll_id"]
  request_payload = {
    scroll_id: scroll_id,
    scroll: scroll
  }

  loop do
    requested_at = Time.current

    results = instrument(:scroll, scroll: scroll_id) do |event_payload|
      response = client.scroll(request_payload)
      last_response_time = Time.current

      progress += response["hits"]["hits"].size

      event_payload[:request] = request_payload
      event_payload[:response] = response
      event_payload[:total] = total
      event_payload[:progress] = progress

      response
    rescue Elastic::Transport::Transport::Errors::NotFound => e
      raise_enhance_message_when_scroll_failed(e, scroll, requested_at, last_response_time)
    end

    hits = results["hits"]["hits"]
    break if hits.empty?

    yield hits
    break if progress >= total
  end

  total
ensure
  clear_scroll(scroll_id) if scroll_id
end