Class: Embulk::Output::Elasticsearch
- Inherits:
-
OutputPlugin
- Object
- OutputPlugin
- Embulk::Output::Elasticsearch
- Defined in:
- lib/embulk/output/elasticsearch_ruby.rb
Constant Summary collapse
- ENABLE_MODE =
%w[normal update replace]
Class Method Summary collapse
- .cleanup(task, schema, count, task_reports) ⇒ Object
- .create_aliases(client, als, index) ⇒ Object
- .create_client(task) ⇒ Object
- .delete_aliases(client, task) ⇒ Object
- .get_index(task) ⇒ Object
- .get_index_prefix(task) ⇒ Object
- .transaction(config, schema, count, &control) ⇒ Object
Instance Method Summary collapse
- #abort ⇒ Object
- #add(page) ⇒ Object
- #close ⇒ Object
- #commit ⇒ Object
- #finish ⇒ Object
-
#init ⇒ Object
def self.resume(task, schema, count, &control) task_reports = yield(task).
Class Method Details
.cleanup(task, schema, count, task_reports) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 40 def self.cleanup(task, schema, count, task_reports) if task['mode'] == 'replace' client = create_client(task) create_aliases(client, task['index'], get_index(task)) delete_aliases(client, task) end end |
.create_aliases(client, als, index) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 66 def self.create_aliases(client, als, index) client.indices.update_aliases body: { actions: [{ add: { index: index, alias: als } }] } Embulk.logger.info "created alias: #{als}, index: #{index}" end |
.create_client(task) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 48 def self.create_client(task) transport = ::Elasticsearch::Transport::Transport::HTTP::Faraday.new( { hosts: task['nodes'].map{ |node| Hash[node.map{ |k, v| [k.to_sym, v] }] }, options: { reload_connections: task['reload_connections'], reload_on_failure: task['reload_on_failure'], retry_on_failure: task['retry_on_failure'], transport_options: { request: { timeout: task['request_timeout'] } } } } ) ::Elasticsearch::Client.new transport: transport end |
.delete_aliases(client, task) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 73 def self.delete_aliases(client, task) indices = client.indices.get_aliases.select { |key, value| value['aliases'].include? task['index'] }.keys indices = indices.select { |index| /^#{get_index_prefix(task)}-(\d*)/ =~ index } indices.each { |index| if index != get_index(task) client.indices.delete_alias index: index, name: task['index'] Embulk.logger.info "deleted alias: #{task['index']}, index: #{index}" if task['delete_old_index'] client.indices.delete index: index Embulk.logger.info "deleted index: #{index}" end end } end |
.get_index(task) ⇒ Object
88 89 90 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 88 def self.get_index(task) task['mode'] == 'replace' ? "#{get_index_prefix(task)}-#{task['time_value']}" : task['index'] end |
.get_index_prefix(task) ⇒ Object
92 93 94 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 92 def self.get_index_prefix(task) "#{task['index']}-#{task['index_type']}" end |
.transaction(config, schema, count, &control) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 11 def self.transaction(config, schema, count, &control) task = { "nodes" => config.param("nodes", :array, default: [{ 'host' => 'localhost', 'port' => 9200 }]), "request_timeout" => config.param("request_timeout", :integer, default: 60), "index" => config.param("index", :string, default: 'logstash-%Y.%m.%d'), "mode" => config.param("mode", :string, default: 'normal'), "reload_connections" => config.param("reload_connections", :bool, default: true), "reload_on_failure" => config.param("reload_on_failure", :bool, default: false), "delete_old_index" => config.param("delete_old_index", :bool, default: false), "index_type" => config.param("index_type", :string), "id_keys" => config.param("id_keys", :array, default: nil), "id_format" => config.param("id_format", :string, default: nil), "array_columns" => config.param("array_columns", :array, default: nil), "bulk_actions" => config.param("bulk_actions", :integer, default: 1000), "retry_on_failure" => config.param("retry_on_failure", :integer, default: 5), } task['time_value'] = Time.now.strftime('%Y.%m.%d.%H.%M.%S') task['index'] = Time.now.strftime(task['index']) unless ENABLE_MODE.include?(task['mode']) raise ConfigError.new "`mode` must be one of #{ENABLE_MODE.join(', ')}" end Embulk.logger.info("mode => #{task['mode']}") task_reports = yield(task) next_config_diff = {} return next_config_diff end |
Instance Method Details
#abort ⇒ Object
143 144 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 143 def abort end |
#add(page) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 121 def add(page) page.each do |record| hash = Hash[schema.names.zip(record)] action = (@mode == 'update') ? :update : :index = {} [action] = { _index: @index, _type: @index_type } [action][:_id] = generate_id(@id_format, hash, @id_keys) unless @id_keys.nil? source = generate_array(hash) @bulk_message << @bulk_message << source if @bulk_actions * 2 <= @bulk_message.size send end end end |
#close ⇒ Object
118 119 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 118 def close end |
#commit ⇒ Object
146 147 148 149 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 146 def commit task_report = {} return task_report end |
#finish ⇒ Object
137 138 139 140 141 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 137 def finish if @bulk_message.size > 0 send end end |
#init ⇒ Object
def self.resume(task, schema, count, &control)
task_reports = yield(task)
next_config_diff = {}
return next_config_diff
end
103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 103 def init @nodes = task["nodes"] @index_type = task["index_type"] @id_keys = task["id_keys"] @id_format = task["id_format"] @bulk_actions = task["bulk_actions"] @array_columns = task["array_columns"] @retry_on_failure = task["retry_on_failure"] @mode = task["mode"] @index = self.class.get_index(task) @client = self.class.create_client(task) @bulk_message = [] end |