Module: Chewy::Search::Scrolling

Included in:
Request
Defined in:
lib/chewy/search/scrolling.rb

Overview

This module contains batch requests DSL via ES scroll API. All the methods are optimized on memory consumption, they are not caching anythig, so use them when you need to do some single-run stuff on a huge amount of documents. Don't forget to tune the scroll parameter for long-lasting actions. All the scroll methods respect the limit value if provided.

Instance Method Summary collapse

Instance Method Details

#scroll_batches(batch_size: 1000, scroll: '1m') {|batch| ... } ⇒ Object #scroll_batches(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Limit if overrided by the batch_size. There are 2 possible use-cases: with a block or without.

Overloads:

  • #scroll_batches(batch_size: 1000, scroll: '1m') {|batch| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_batches { |batch| batch.each { |hit| p hit['_id'] } }

    Yield Parameters:

    • batch (Array<Hash>)

      block is executed for each batch of hits

  • #scroll_batches(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_batches.flat_map { |batch| batch.map { |hit| hit['_id'] } }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: Request::DEFAULT_BATCH_SIZE)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: Request::DEFAULT_SCROLL)

    cursor expiration time



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/chewy/search/scrolling.rb', line 27

def scroll_batches(batch_size: Request::DEFAULT_BATCH_SIZE, scroll: Request::DEFAULT_SCROLL)
  return enum_for(:scroll_batches, batch_size: batch_size, scroll: scroll) unless block_given?

  result = perform(size: batch_size, scroll: scroll)
  total = [raw_limit_value, result.fetch('hits', {}).fetch('total', 0)].compact.min
  last_batch_size = total % batch_size
  fetched = 0

  loop do
    hits = result.fetch('hits', {}).fetch('hits', [])
    fetched += hits.size
    hits = hits.first(last_batch_size) if last_batch_size != 0 && fetched >= total
    yield(hits) if hits.present?
    break if fetched >= total
    scroll_id = result['_scroll_id']
    result = perform_scroll(scroll: scroll, scroll_id: scroll_id)
  end
end

#scroll_hits(batch_size: 1000, scroll: '1m') {|hit| ... } ⇒ Object #scroll_hits(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Yields each hit separately.

Overloads:

  • #scroll_hits(batch_size: 1000, scroll: '1m') {|hit| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_hits { |hit| p hit['_id'] }

    Yield Parameters:

    • hit (Hash)

      block is executed for each hit

  • #scroll_hits(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_hits.map { |hit| hit['_id'] }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time



61
62
63
64
65
66
67
# File 'lib/chewy/search/scrolling.rb', line 61

def scroll_hits(**options)
  return enum_for(:scroll_hits, **options) unless block_given?

  scroll_batches(**options).each do |batch|
    batch.each { |hit| yield hit }
  end
end

#scroll_objects(batch_size: 1000, scroll: '1m') {|record| ... } ⇒ Object #scroll_objects(batch_size: 1000, scroll: '1m') ⇒ Enumerator Also known as: scroll_records, scroll_documents

Note:

If the record is not found it yields nil instead.

Iterates through the documents of the scope in batches. Performs load operation for each batch and then yields each loaded ORM/ODM object. Uses Request#load passed options for loading.

Overloads:

  • #scroll_objects(batch_size: 1000, scroll: '1m') {|record| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_objects { |record| p record.id }

    Yield Parameters:

    • record (Object)

      block is executed for each record loaded

  • #scroll_objects(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_objects.map { |record| record.id }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time

See Also:



113
114
115
116
117
118
119
120
# File 'lib/chewy/search/scrolling.rb', line 113

def scroll_objects(**options)
  return enum_for(:scroll_objects, **options) unless block_given?

  except(:source, :stored_fields, :script_fields, :docvalue_fields)
    .source(false).scroll_batches(**options).each do |batch|
      loader.load(batch).each { |object| yield object }
    end
end

#scroll_wrappers(batch_size: 1000, scroll: '1m') {|object| ... } ⇒ Object #scroll_wrappers(batch_size: 1000, scroll: '1m') ⇒ Enumerator

Iterates through the documents of the scope in batches. Yields each hit wrapped with Type.

Overloads:

  • #scroll_wrappers(batch_size: 1000, scroll: '1m') {|object| ... } ⇒ Object

    Examples:

    PlaceIndex.scroll_wrappers { |object| p object.id }

    Yield Parameters:

    • object (Chewy::Type)

      block is executed for each hit object

  • #scroll_wrappers(batch_size: 1000, scroll: '1m') ⇒ Enumerator

    Returns a standard ruby Enumerator.

    Examples:

    PlaceIndex.scroll_wrappers.map { |object| object.id }

    Returns:

    • (Enumerator)

      a standard ruby Enumerator

Parameters:

  • batch_size (Integer) (defaults to: 1000)

    batch size obviously, replaces size query parameter

  • scroll (String) (defaults to: '1m')

    cursor expiration time



85
86
87
88
89
90
91
# File 'lib/chewy/search/scrolling.rb', line 85

def scroll_wrappers(**options)
  return enum_for(:scroll_wrappers, **options) unless block_given?

  scroll_hits(**options).each do |hit|
    yield loader.derive_type(hit['_index'], hit['_type']).build(hit)
  end
end