Module: Elasticsearch::Model::Extensions::BatchUpdating::ClassMethods
- Defined in:
- lib/elasticsearch/model/extensions/batch_updating.rb
Instance Method Summary collapse
- #for_batch_indexing ⇒ Object
- #for_indexing ⇒ Object
- #split_ids_into(chunk_num, min: nil, max: nil) ⇒ Object
- #update_index_in_batch(records, index: nil, type: nil, client: nil) ⇒ Object
- #update_index_in_batches(batch_size: DEFAULT_BATCH_SIZE, where: nil, index: nil, type: nil) ⇒ Object
- #update_index_in_parallel(parallelism:, index: nil, type: nil, min: nil, max: nil, batch_size: DEFAULT_BATCH_SIZE) ⇒ Object
Instance Method Details
#for_batch_indexing ⇒ Object
63 64 65 |
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 63 def for_batch_indexing with_indexed_tables_included.extending(::Elasticsearch::Model::Extensions::BatchUpdating::Association::Extension) end |
#for_indexing ⇒ Object
59 60 61 |
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 59 def for_indexing for_batch_indexing end |
#split_ids_into(chunk_num, min: nil, max: nil) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 30 def split_ids_into(chunk_num, min:nil, max:nil) min ||= minimum(:id) max ||= maximum(:id) chunk_num.times.inject([]) do |r,i| chunk_size = ((max-min+1)/chunk_num.to_f).ceil first = chunk_size * i last = if i == chunk_num - 1 max else chunk_size * (i + 1) - 1 end r << (first..last) end end |
#update_index_in_batch(records, index: nil, type: nil, client: nil) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 79 def update_index_in_batch(records, index: nil, type: nil, client: nil) klass = self client ||= klass.__elasticsearch__.client index ||= klass.index_name type ||= klass.document_type if records.size > 1 response = client.bulk \ index: index, type: type, body: records.map { |r| { index: { _id: r.id, data: r.as_indexed_json } } } one_or_more_errors_occurred = response["errors"] if one_or_more_errors_occurred if defined? ::Rails ::Rails.logger.warn "One or more error(s) occurred while updating the index #{records} for the type #{type}\n#{JSON.pretty_generate(response)}" else warn "One or more error(s) occurred while updating the index #{records} for the type #{type}\n#{JSON.pretty_generate(response)}" end end else records.each do |r| client.index index: index, type: type, id: r.id, body: r.as_indexed_json end end end |
#update_index_in_batches(batch_size: DEFAULT_BATCH_SIZE, where: nil, index: nil, type: nil) ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 68 def update_index_in_batches(batch_size: DEFAULT_BATCH_SIZE, where: nil, index: nil, type: nil) records_in_scope = if where.nil? for_batch_indexing else for_batch_indexing.where(where) end records_in_scope.update_index_in_batches(batch_size: batch_size, index: index, type: type) end |
#update_index_in_parallel(parallelism:, index: nil, type: nil, min: nil, max: nil, batch_size: DEFAULT_BATCH_SIZE) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/elasticsearch/model/extensions/batch_updating.rb', line 47 def update_index_in_parallel(parallelism:, index: nil, type: nil, min: nil, max: nil, batch_size:DEFAULT_BATCH_SIZE) klass = self Parallel.each(klass.split_ids_into(parallelism, min: min, max: max), in_processes: parallelism) do |id_range| @rdb_reconnected ||= klass.connection.reconnect! || true @elasticsearch_reconnected ||= klass.__elasticsearch__.client = Elasticsearch::Client.new(host: klass.elasticsearch_hosts) klass.for_indexing.update_index_for_ids_in_range id_range, index: index, type: type, batch_size: batch_size end klass.connection.reconnect! end |