Module: Elasticsearch::Model::Extensions::BatchUpdating::ClassMethods

Defined in:
lib/elasticsearch/model/extensions/batch_updating.rb

Instance Method Summary collapse

Instance Method Details

#for_batch_indexingObject



63
64
65
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 63

def for_batch_indexing
  with_indexed_tables_included.extending(::Elasticsearch::Model::Extensions::BatchUpdating::Association::Extension)
end

#for_indexingObject



59
60
61
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 59

def for_indexing
  for_batch_indexing
end

#split_ids_into(chunk_num, min: nil, max: nil) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 30

def split_ids_into(chunk_num, min:nil, max:nil)
  min ||= minimum(:id)
  max ||= maximum(:id)
  chunk_num.times.inject([]) do |r,i|
    chunk_size = ((max-min+1)/chunk_num.to_f).ceil
    first = chunk_size * i

    last = if i == chunk_num - 1
             max
           else
             chunk_size * (i + 1) - 1
           end

    r << (first..last)
  end
end

#update_index_in_batch(records, index: nil, type: nil, client: nil) ⇒ Object

Parameters:

  • records (Array)


79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 79

def update_index_in_batch(records, index: nil, type: nil, client: nil)
  klass = self

  client ||= klass.__elasticsearch__.client
  index ||= klass.index_name
  type ||= klass.document_type

  if records.size > 1
    response = client.bulk \
                   index:   index,
                   type:    type,
                   body:    records.map { |r| { index: { _id: r.id, data: r.as_indexed_json } } }

    one_or_more_errors_occurred = response["errors"]

    if one_or_more_errors_occurred
      if defined? ::Rails
        ::Rails.logger.warn "One or more error(s) occurred while updating the index #{records} for the type #{type}\n#{JSON.pretty_generate(response)}"
      else
        warn "One or more error(s) occurred while updating the index #{records} for the type #{type}\n#{JSON.pretty_generate(response)}"
      end
    end
  else
    records.each do |r|
      client.index index: index, type: type, id: r.id, body: r.as_indexed_json
    end
  end
end

#update_index_in_batches(batch_size: DEFAULT_BATCH_SIZE, where: nil, index: nil, type: nil) ⇒ Object

Parameters:

  • batch_size (Fixnum) (defaults to: DEFAULT_BATCH_SIZE)


68
69
70
71
72
73
74
75
76
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 68

def update_index_in_batches(batch_size: DEFAULT_BATCH_SIZE, where: nil, index: nil, type: nil)
  records_in_scope = if where.nil?
                       for_batch_indexing
                     else
                       for_batch_indexing.where(where)
                     end

  records_in_scope.update_index_in_batches(batch_size: batch_size, index: index, type: type)
end

#update_index_in_parallel(parallelism:, index: nil, type: nil, min: nil, max: nil, batch_size: DEFAULT_BATCH_SIZE) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 47

def update_index_in_parallel(parallelism:, index: nil, type: nil, min: nil, max: nil, batch_size:DEFAULT_BATCH_SIZE)
  klass = self

  Parallel.each(klass.split_ids_into(parallelism, min: min, max: max), in_processes: parallelism) do |id_range|
    @rdb_reconnected ||= klass.connection.reconnect! || true
    @elasticsearch_reconnected ||= klass.__elasticsearch__.client = Elasticsearch::Client.new(host: klass.elasticsearch_hosts)
    klass.for_indexing.update_index_for_ids_in_range id_range, index: index, type: type, batch_size: batch_size
  end

  klass.connection.reconnect!
end