Class: Es2Bulk

Inherits:
Object
  • Object
show all
Defined in:
lib/es2bulk/es2bulk.rb,
lib/es2bulk/version.rb

Defined Under Namespace

Classes: Error, EsConnectionError, EsResponseError

Constant Summary collapse

RETRIEVE_SIZE =
3000
VERSION =
'0.1.0'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(index_pattern, host = 'localhost', port = 9200, without_id) ⇒ Es2Bulk

Returns a new instance of Es2Bulk.



12
13
14
15
16
17
# File 'lib/es2bulk/es2bulk.rb', line 12

def initialize(index_pattern, host = 'localhost', port = 9200, without_id)
  @index_pattern = index_pattern
  @host = host
  @port = port
  @without_id = without_id
end

Instance Method Details

#generateObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/es2bulk/es2bulk.rb', line 19

def generate
  path = "/#{@index_pattern}/_search"
  search_after = nil
  total = 0
  connect
  routing_field = get_version >= '6.0.0' ? :routing : :_routing
  loop do
    body = { query: { match_all: {} }, sort: [:_uid], size: RETRIEVE_SIZE }
    body[:search_after] = search_after if search_after
    req = Net::HTTP::Post.new(path, 'Content-Type' => 'application/json')
    req.body = body.to_json
    response = request(req)
    result = JSON.parse(response.body, symbolize_names: true)
    if result[:error]
      raise EsResponseError, result[:error]
    end
    result[:hits][:hits].each do |hits|
      meta = { index: { _index: hits[:_index], _type: hits[:_type] } }
      meta[:index][routing_field] = hits[routing_field] if hits[routing_field]
      meta[:index][:_id] = hits[:_id] unless @without_id
      body = hits[:_source].sort.to_h
      yield meta, body
    end
    total += result[:hits][:hits].size
    break if result[:hits][:total] <= total

    search_after = result[:hits][:hits][-1][:sort]
  end
end