Class: Elasticsearch::Extensions::Reindex::Reindex
- Inherits:
-
Object
- Object
- Elasticsearch::Extensions::Reindex::Reindex
- Defined in:
- lib/elasticsearch/extensions/reindex.rb
Overview
Copy documents from one index into another
The reindexing process works by “scrolling” an index and sending batches via the “Bulk” API to the destination index/cluster
Be aware, that if you want to change the destination index settings and/or mappings, you have to do so in advance by using the “Indices Create” API.
Note, that there is a native “Reindex” API in Elasticsearch 2.3.x and higer versions, which will be more performant than the Ruby version.
Instance Attribute Summary collapse
-
#arguments ⇒ Object
readonly
Returns the value of attribute arguments.
Instance Method Summary collapse
- #__store_batch(documents) ⇒ Object
-
#initialize(arguments = {}) ⇒ Reindex
constructor
A new instance of Reindex.
-
#perform ⇒ Hash
Performs the operation.
Constructor Details
#initialize(arguments = {}) ⇒ Reindex
Returns a new instance of Reindex.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 114 def initialize(arguments={}) [ [:source, :index], [:source, :client], [:dest, :index] ].each do |required_option| value = required_option.reduce(arguments) { |sum, o| sum = sum[o] ? sum[o] : {} } raise ArgumentError, "Required argument '#{Hash[*required_option]}' missing" if \ value.respond_to?(:empty?) ? value.empty? : value.nil? end @arguments = { batch_size: 1000, scroll: '5m', refresh: false }.merge(arguments) arguments[:dest][:client] ||= arguments[:source][:client] end |
Instance Attribute Details
#arguments ⇒ Object (readonly)
Returns the value of attribute arguments.
112 113 114 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 112 def arguments @arguments end |
Instance Method Details
#__store_batch(documents) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 169 def __store_batch(documents) body = documents.map do |doc| doc['_index'] = arguments[:dest][:index] arguments[:transform].call(doc) if arguments[:transform] doc['data'] = doc['_source'] doc.delete('_score') doc.delete('_source') { index: doc } end arguments[:dest][:client].bulk body: body end |
#perform ⇒ Hash
Performs the operation
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/elasticsearch/extensions/reindex.rb', line 140 def perform output = { errors: 0 } response = arguments[:source][:client].search( index: arguments[:source][:index], scroll: arguments[:scroll], size: arguments[:batch_size] ) documents = response['hits']['hits'] unless documents.empty? bulk_response = __store_batch(documents) output[:errors] += bulk_response['items'].select { |k, v| k.values.first['error'] }.size end while response = arguments[:source][:client].scroll(scroll_id: response['_scroll_id'], scroll: arguments[:scroll]) do documents = response['hits']['hits'] break if documents.empty? bulk_response = __store_batch(documents) output[:errors] += bulk_response['items'].select { |k, v| k.values.first['error'] }.size end arguments[:dest][:client].indices.refresh index: arguments[:dest][:index] if arguments[:refresh] output end |