33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/dyna_model/adapters/elasticsearch/dyna_model_adapter.rb', line 33
def __find_in_batches(options={}, &block)
items = []
read_provision = self.dynamo_db_table.table_schema[:provisioned_throughput][:read_capacity_units]
raise "read_provision not set for class!" unless read_provision
default_batch_size = (read_provision / 2.0).floor
batch_size = options[:batch_size] || default_batch_size
puts "Indexing via scan with batch size of #{batch_size}..."
scan_idx = 0
results_hash = {}
while scan_idx == 0 || (results_hash && results_hash[:last_evaluated_key])
puts "Batch iteration #{scan_idx+1}..."
scan_options = {
batch: batch_size,
manual_batching: true,
return_consumed_capacity: :total
}
scan_options.merge!(exclusive_start_key: results_hash[:last_evaluated_key]) if results_hash[:last_evaluated_key]
scan_options.merge!(scan_filter: options[:scan_filter]) if options[:scan_filter]
results_hash = self.scan(scan_options)
unless results_hash[:results].blank?
puts "Indexing #{results_hash[:results].size} results..."
batch_for_bulk = results_hash[:results].map { |a| { index: {
_id: a.id,
data: a.__elasticsearch__.as_indexed_json
} } }
yield batch_for_bulk
end
if results_hash[:last_evaluated_key] && results_hash[:consumed_capacity]
sleep_time = results_hash[:consumed_capacity][:capacity_units].to_f / (read_provision / 2.0)
puts "Sleeping for #{sleep_time}..."
sleep(sleep_time)
end
scan_idx += 1
end
end
|