10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 10
def self.transaction(config, schema, count, &control)
task = {
"nodes" => config.param("nodes", :array, default: [{ 'host' => 'localhost', 'port' => 9200 }]),
"request_timeout" => config.param("request_timeout", :integer, default: 60),
"index_type" => config.param("index_type", :string),
"mode" => config.param("mode", :string, default: 'normal'),
"reload_connections" => config.param("reload_connections", :bool, default: true),
"reload_on_failure" => config.param("reload_on_failure", :bool, default: false),
"delete_old_index" => config.param("delete_old_index", :bool, default: false),
"delete_old_alias" => config.param("delete_old_alias", :bool, default: true),
"id_keys" => config.param("id_keys", :array, default: nil),
"id_format" => config.param("id_format", :string, default: nil),
"array_columns" => config.param("array_columns", :array, default: nil),
"bulk_actions" => config.param("bulk_actions", :integer, default: 1000),
"retry_on_failure" => config.param("retry_on_failure", :integer, default: 5),
}
unless ENABLE_MODE.include?(task['mode'])
raise ConfigError.new "`mode` must be one of #{ENABLE_MODE.join(', ')}"
end
Embulk.logger.info("mode => #{task['mode']}")
current_index_name = config.param("current_index_name", :string, default: nil)
index = config.param("index", :string, default: 'logstash-%Y.%m.%d')
if task['mode'] == 'replace'
task['alias'] = index
task['index'] = if current_index_name
current_index_name
else
"#{index}-#{task['index_type']}-#{Time.now.strftime('%Y.%m.%d.%H.%M.%S')}"
end
else
task['index'] = Time.now.strftime(index)
end
Embulk.logger.info("nodes => #{task['nodes']}")
Embulk.logger.info("index => #{task['index']}")
Embulk.logger.info("index_type => #{task['index_type']}")
Embulk.logger.info("alias => #{task['alias']}")
connection = Connection.new(task)
before_delete_index = config.param("before_delete_index", :bool, default: false)
if before_delete_index
connection.delete_index(task['index'])
end
before_template_name = config.param("before_template_name", :string, default: nil)
before_template = config.param("before_template", :hash, default: nil)
if before_template_name && before_template
connection.put_template(before_template_name, before_template)
end
task_reports = yield(task)
next_config_diff = {}
return next_config_diff
end
|