Method: Ej::Core#copy

Defined in:
lib/ej/core.rb

#copy(source, dest, query, per_size, scroll, dest_index, slice_max) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/ej/core.rb', line 150

def copy(source, dest, query, per_size, scroll, dest_index, slice_max)
  source_client = Elasticsearch::Client.new transport: Util.get_transport(Util.parse_hosts(source))
  dest_client = Elasticsearch::Client.new transport: Util.get_transport(Util.parse_hosts(dest))

  parallel_array = slice_max ? slice_max.times.to_a : [0]
  Parallel.map(parallel_array, :in_processes=>parallel_array.size) do |slice_id|
    scroll_option = get_scroll_option(@index, query, per_size, scroll, slice_id, slice_max)
    r = connect_with_retry { source_client.search(scroll_option) }
    total = r['hits']['total']
    i = 0
    i += bulk_results(r, dest_client, i, total, dest_index, slice_id)

    while r = connect_with_retry { source_client.scroll(scroll_id: r['_scroll_id'], scroll: scroll) } and
      (not r['hits']['hits'].empty?) do
      i += bulk_results(r, dest_client, i, total, dest_index, slice_id)
    end
  end
end