Module: Elasticsearch::Model::Importing::ClassMethods

Defined in:
lib/elasticsearch/model/importing.rb

Instance Method Summary collapse

Instance Method Details

#__batch_to_bulk(batch, transform) ⇒ Object



143
144
145
# File 'lib/elasticsearch/model/importing.rb', line 143

def __batch_to_bulk(batch, transform)
  batch.map { |model| transform.call(model) }
end

#import(options = {}, &block) {|Hash| ... } ⇒ Fixnum

Import all model records into the index

The method will pick up correct strategy based on the ‘Importing` module defined in the corresponding adapter.

Article.import preprocess: :enrich

Article.import return: 'errors'

Examples:

Import all records into the index


Article.import

Set the batch size to 100


Article.import batch_size: 100

Process the response from Elasticsearch


Article.import do |response|
  puts "Got " + response['items'].select { |i| i['index']['error'] }.size.to_s + " errors"
end

Delete and create the index with appropriate settings and mappings


Article.import force: true

Refresh the index after importing all batches


Article.import refresh: true

Import the records into a different index/type than the default one


Article.import index: 'my-new-index', type: 'my-other-type'

Pass an ActiveRecord scope to limit the imported records


Article.import scope: 'published'

Pass an ActiveRecord query to limit the imported records


Article.import query: -> { where(author_id: author_id) }

Transform records during the import with a lambda


transform = lambda do |a|
  {index: {_id: a.id, _parent: a.author_id, data: a.__elasticsearch__.as_indexed_json}}
end

Article.import transform: transform

Update the batch before yielding it


class Article
  # ...
  def self.enrich(batch)
    batch.each do |item|
      item. = MyAPI.(item.id)
    end
    batch
  end
end

Return an array of error elements instead of the number of errors, eg.

to try importing these records again

Parameters:

  • options (Hash) (defaults to: {})

    Options passed to the underlying ‘__find_in_batches`method

  • block (Proc)

    Optional block to evaluate for each batch

Yields:

  • (Hash)

    Gives the Hash with the Elasticsearch response to the block

Returns:

  • (Fixnum)

    Number of errors encountered during importing



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/elasticsearch/model/importing.rb', line 102

def import(options={}, &block)
  errors       = []
  refresh      = options.delete(:refresh)   || false
  target_index = options.delete(:index)     || index_name
  target_type  = options.delete(:type)      || document_type
  transform    = options.delete(:transform) || __transform
  return_value = options.delete(:return)    || 'count'

  unless transform.respond_to?(:call)
    raise ArgumentError,
          "Pass an object responding to `call` as the :transform option, #{transform.class} given"
  end

  if options.delete(:force)
    self.create_index! force: true, index: target_index
  elsif !self.index_exists? index: target_index
    raise ArgumentError,
          "#{target_index} does not exist to be imported into. Use create_index! or the :force option to create it."
  end

  __find_in_batches(options) do |batch|
    response = client.bulk \
                 index:   target_index,
                 type:    target_type,
                 body:    __batch_to_bulk(batch, transform)

    yield response if block_given?

    errors +=  response['items'].select { |k, v| k.values.first['error'] }
  end

  self.refresh_index! index: target_index if refresh

  case return_value
    when 'errors'
      errors
    else
      errors.size
  end
end