Class: Elasticsearch::Extensions::Reindex::Reindex
- Inherits:
-
Object
- Object
- Elasticsearch::Extensions::Reindex::Reindex
- Defined in:
- lib/elasticsearch/extensions/reindex.rb
Overview
Copy documents from one index into another
The reindexing process works by “scrolling” an index and sending batches via the “Bulk” API to the target index/cluster
Be aware, that if you want to change the target index settings and/or mappings, you have to do so in advance by using the “Indices Create” API.
Note, that there is a native “Reindex” API in Elasticsearch 2.3.x and higer versions, which will be more performant than the Ruby version.
Instance Attribute Summary collapse
-
#arguments ⇒ Object
readonly
Returns the value of attribute arguments.
Instance Method Summary collapse
- #__store_batch(documents) ⇒ Object
-
#initialize(arguments = {}) ⇒ Reindex
constructor
A new instance of Reindex.
-
#perform ⇒ Hash
Performs the operation.
Constructor Details
#initialize(arguments = {}) ⇒ Reindex
Returns a new instance of Reindex.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 97 def initialize(arguments={}) [ [:source, :index], [:source, :client], [:target, :index] ].each do |required_option| value = required_option.reduce(arguments) { |sum, o| sum = sum[o] ? sum[o] : {} } raise ArgumentError, "Required argument '#{Hash[*required_option]}' missing" if \ value.respond_to?(:empty?) ? value.empty? : value.nil? end @arguments = { batch_size: 1000, scroll: '5m', refresh: false }.merge(arguments) arguments[:target][:client] ||= arguments[:source][:client] end |
Instance Attribute Details
#arguments ⇒ Object (readonly)
Returns the value of attribute arguments.
95 96 97 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 95 def arguments @arguments end |
Instance Method Details
#__store_batch(documents) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 152 def __store_batch(documents) body = documents.map do |doc| doc['_index'] = arguments[:target][:index] arguments[:transform].call(doc) if arguments[:transform] doc['data'] = doc['_source'] doc.delete('_score') doc.delete('_source') { index: doc } end arguments[:target][:client].bulk body: body end |
#perform ⇒ Hash
Performs the operation
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 123 def perform output = { errors: 0 } response = arguments[:source][:client].search( index: arguments[:source][:index], scroll: arguments[:scroll], size: arguments[:batch_size] ) documents = response['hits']['hits'] unless documents.empty? bulk_response = __store_batch(documents) output[:errors] += bulk_response['items'].select { |k, v| k.values.first['error'] }.size end while response = arguments[:source][:client].scroll(scroll_id: response['_scroll_id'], scroll: arguments[:scroll]) do documents = response['hits']['hits'] break if documents.empty? bulk_response = __store_batch(documents) output[:errors] += bulk_response['items'].select { |k, v| k.values.first['error'] }.size end arguments[:target][:client].indices.refresh index: arguments[:target][:index] if arguments[:refresh] output end |