Module: Chewy::Search::Scrolling

Included in:
Request
Defined in:
lib/chewy/search/scrolling.rb

Overview

This module contains batch requests DSL via ES scroll API. All the methods are optimized on memory consumption, they are not caching anythig, so use them when you need to do some single-run stuff on a huge amount of documents. Don't forget to tune the scroll parameter for long-lasting actions. All the scroll methods respect the limit value if provided.

Instance Method Summary collapse

Instance Method Details

#scroll_batches(batch_size: 1000, scroll: '1m') {|batch| ... } ⇒ Object #scroll_batches(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Limit if overrided by the batch_size. There are 2 possible use-cases: with a block or without.

Overloads:

  • #scroll_batches(batch_size: 1000, scroll: '1m') {|batch| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_batches { |batch| batch.each { |hit| p hit['_id'] } }

    Yield Parameters:

    • batch (Array<Hash>)

      block is executed for each batch of hits

  • #scroll_batches(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_batches.flat_map { |batch| batch.map { |hit| hit['_id'] } }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: Request::DEFAULT_BATCH_SIZE)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: Request::DEFAULT_SCROLL)

    cursor expiration time



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/chewy/search/scrolling.rb', line 27

def scroll_batches(batch_size: Request::DEFAULT_BATCH_SIZE, scroll: Request::DEFAULT_SCROLL)
  return enum_for(:scroll_batches, batch_size: batch_size, scroll: scroll) unless block_given?

  result = perform(size: batch_size, scroll: scroll)
  total = [raw_limit_value, result.fetch('hits', {}).fetch('total', {}).fetch('value', 0)].compact.min
  last_batch_size = total % batch_size
  fetched = 0
  scroll_id = nil

  loop do
    hits = result.fetch('hits', {}).fetch('hits', [])
    fetched += hits.size
    hits = hits.first(last_batch_size) if last_batch_size != 0 && fetched >= total
    yield(hits) if hits.present?
    scroll_id = result['_scroll_id']
    break if fetched >= total

    result = perform_scroll(scroll: scroll, scroll_id: scroll_id)
  end
ensure
  Chewy.client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
end

#scroll_hits(batch_size: 1000, scroll: '1m') {|hit| ... } ⇒ Object #scroll_hits(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Yields each hit separately.

Overloads:

  • #scroll_hits(batch_size: 1000, scroll: '1m') {|hit| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_hits { |hit| p hit['_id'] }

    Yield Parameters:

    • hit (Hash)

      block is executed for each hit

  • #scroll_hits(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_hits.map { |hit| hit['_id'] }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time



65
66
67
68
69
70
71
# File 'lib/chewy/search/scrolling.rb', line 65

def scroll_hits(**options, &block)
  return enum_for(:scroll_hits, **options) unless block_given?

  scroll_batches(**options).each do |batch|
    batch.each(&block)
  end
end

#scroll_objects(batch_size: 1000, scroll: '1m') {|record| ... } ⇒ Object #scroll_objects(batch_size: 1000, scroll: '1m') ⇒ Enumerator Also known as: scroll_records, scroll_documents

Note:

If the record is not found it yields nil instead.

Iterates through the documents of the scope in batches. Performs load operation for each batch and then yields each loaded ORM/ODM object. Uses Request#load passed options for loading.

Overloads:

  • #scroll_objects(batch_size: 1000, scroll: '1m') {|record| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_objects { |record| p record.id }

    Yield Parameters:

    • record (Object)

      block is executed for each record loaded

  • #scroll_objects(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_objects.map { |record| record.id }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time

See Also:



117
118
119
120
121
122
123
124
# File 'lib/chewy/search/scrolling.rb', line 117

def scroll_objects(**options, &block)
  return enum_for(:scroll_objects, **options) unless block_given?

  except(:source, :stored_fields, :script_fields, :docvalue_fields)
    .source(false).scroll_batches(**options).each do |batch|
      loader.load(batch).each(&block)
    end
end

#scroll_wrappers(batch_size: 1000, scroll: '1m') {|object| ... } ⇒ Object #scroll_wrappers(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Yields each hit wrapped with Index.

Overloads:

  • #scroll_wrappers(batch_size: 1000, scroll: '1m') {|object| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_wrappers { |object| p object.id }

    Yield Parameters:

    • object (Chewy::Index)

      block is executed for each hit object

  • #scroll_wrappers(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_wrappers.map { |object| object.id }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time



89
90
91
92
93
94
95
# File 'lib/chewy/search/scrolling.rb', line 89

def scroll_wrappers(**options)
  return enum_for(:scroll_wrappers, **options) unless block_given?

  scroll_hits(**options).each do |hit|
    yield loader.derive_index(hit['_index']).build(hit)
  end
end