Class: Metacrunch::Elasticsearch::Searcher

Inherits:
Processor
  • Object
show all
Includes:
Enumerable, ClientFactory, OptionsHelpers
Defined in:
lib/metacrunch/elasticsearch/searcher.rb

Constant Summary collapse

DEFAULT_BODY =
{ query: { match_all: {} } }
DEFAULT_SCAN_SIZE =

per shard

200
DEFAULT_SCROLL_EXPIRY_TIME =
10.minutes

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from OptionsHelpers

#extract_options!, #normalize_options!

Methods included from ClientFactory

#client_factory

Constructor Details

#initialize(options = {}) ⇒ Searcher

Returns a new instance of Searcher.



22
23
24
25
26
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 22

def initialize(options = {})
  options.deep_symbolize_keys!
  extract_options!(options, :_client_options_, :bulk_size, :index, :scan_size, :scroll_expiry_time, :type)
  @body = options.presence || DEFAULT_BODY
end

Instance Attribute Details

#bulk_sizeObject

Returns the value of attribute bulk_size.



16
17
18
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 16

def bulk_size
  @bulk_size
end

#indexObject

Returns the value of attribute index.



17
18
19
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 17

def index
  @index
end

#scan_sizeObject

Returns the value of attribute scan_size.



18
19
20
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 18

def scan_size
  @scan_size
end

#scroll_expiry_timeObject

Returns the value of attribute scroll_expiry_time.



19
20
21
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 19

def scroll_expiry_time
  @scroll_expiry_time
end

#typeObject

Returns the value of attribute type.



20
21
22
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 20

def type
  @type
end

Instance Method Details

#call(items = [], pipeline = nil) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 28

def call(items = [], pipeline = nil)
  @docs_enumerator ||= @bulk_size ? each_slice(@bulk_size) : [each.to_a].to_enum

  begin
    items.concat(@docs_enumerator.next)
  rescue StopIteration
    pipeline.terminate!
  end
end

#eachObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/metacrunch/elasticsearch/searcher.rb', line 38

def each
  return enum_for(__method__) unless block_given?
  client = client_factory

  search_result = client.search({
    body: @body,
    index: @index,
    scroll: "#{@scroll_expiry_time || DEFAULT_SCROLL_EXPIRY_TIME}s",
    search_type: "scan",
    size: @scan_size || DEFAULT_SCAN_SIZE
  })

  while (
    search_result = client.scroll(
      scroll: "#{DEFAULT_SCROLL_EXPIRY_TIME}s",
      scroll_id: search_result["_scroll_id"]
    ) and # don't use &&, the semantic of and is important here
    search_result["hits"]["hits"].present?
  ) do
    search_result["hits"]["hits"].each do |_hit|
      yield _hit
    end
  end
end