Module: Elasticsearch::Model::Importing::ClassMethods

Defined in:
lib/elasticsearch/model/importing.rb

Instance Method Summary collapse

Instance Method Details

#__batch_to_bulk(batch, transform) ⇒ Object



111
112
113
# File 'lib/elasticsearch/model/importing.rb', line 111

def __batch_to_bulk(batch, transform)
  batch.map { |model| transform.call(model) }
end

#import(options = {}, &block) {|Hash| ... } ⇒ Fixnum

Import all model records into the index

The method will pick up correct strategy based on the ‘Importing` module defined in the corresponding adapter.

Examples:

Import all records into the index


Article.import

Set the batch size to 100


Article.import batch_size: 100

Process the response from Elasticsearch


Article.import do |response|
  puts "Got " + response['items'].select { |i| i['index']['error'] }.size.to_s + " errors"
end

Delete and create the index with appropriate settings and mappings


Article.import force: true

Refresh the index after importing all batches


Article.import refresh: true

Import the records into a different index/type than the default one


Article.import index: 'my-new-index', type: 'my-other-type'

Pass an ActiveRecord scope to limit the imported records


Article.import scope: 'published'

Transform records during the import with a lambda


transform = lambda do |a|
  {index: {_id: a.id, _parent: a.author_id, data: a.__elasticsearch__.as_indexed_json}}
end

Article.import transform: transform

Parameters:

  • options (Hash) (defaults to: {})

    Options passed to the underlying ‘__find_in_batches`method

  • block (Proc)

    Optional block to evaluate for each batch

Yields:

  • (Hash)

    Gives the Hash with the Elasticsearch response to the block

Returns:

  • (Fixnum)

    Number of errors encountered during importing



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/elasticsearch/model/importing.rb', line 79

def import(options={}, &block)
  errors       = 0
  refresh      = options.delete(:refresh)   || false
  target_index = options.delete(:index)     || index_name
  target_type  = options.delete(:type)      || document_type
  transform    = options.delete(:transform) || __transform

  unless transform.respond_to?(:call)
    raise ArgumentError,
          "Pass an object responding to `call` as the :transport option, #{transform.class} given"
  end

  if options.delete(:force)
    self.create_index! force: true, index: target_index
  end

  __find_in_batches(options) do |batch|
    response = client.bulk \
                 index:   target_index,
                 type:    target_type,
                 body:    __batch_to_bulk(batch, transform)

    yield response if block_given?

    errors += response['items'].map { |k, v| k.values.first['error'] }.compact.length
  end

  self.refresh_index! if refresh

  return errors
end