Class: Embulk::Output::Elasticsearch::Connection
- Inherits:
-
Object
- Object
- Embulk::Output::Elasticsearch::Connection
- Defined in:
- lib/embulk/output/elasticsearch/connection.rb
Instance Method Summary collapse
- #create_aliases ⇒ Object
- #create_client(nodes:, reload_connections:, reload_on_failure:, retry_on_failure:, request_timeout:) ⇒ Object
- #delete_aliases ⇒ Object
- #delete_index(index) ⇒ Object
- #generate_id(template, record, id_keys) ⇒ Object
- #generate_meta(record) ⇒ Object
- #generate_source(record) ⇒ Object
-
#initialize(task) ⇒ Connection
constructor
A new instance of Connection.
- #put_template(before_template_name, before_template) ⇒ Object
- #send(bulk_message) ⇒ Object
Constructor Details
#initialize(task) ⇒ Connection
Returns a new instance of Connection.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 8 def initialize(task) @nodes = task["nodes"] @index_type = task["index_type"] @id_keys = task["id_keys"] @id_format = task["id_format"] @array_columns = task["array_columns"] @retry_on_failure = task["retry_on_failure"] @mode = task["mode"] @delete_old_index = task['delete_old_index'] @delete_old_alias = task['delete_old_alias'] @index = task['index'] @alias = task['alias'] @action = (@mode == 'update') ? :update : :index @client = create_client( nodes: task['nodes'], reload_connections: task['reload_connections'], reload_on_failure: task['reload_on_failure'], retry_on_failure: task['retry_on_failure'], request_timeout: task['request_timeout'] ) end |
Instance Method Details
#create_aliases ⇒ Object
53 54 55 56 57 58 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 53 def create_aliases @client.indices.update_aliases body: { actions: [{ add: { index: @index, alias: @alias } }] } Embulk.logger.info "created alias: #{@alias}, index: #{@index}" end |
#create_client(nodes:, reload_connections:, reload_on_failure:, retry_on_failure:, request_timeout:) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 31 def create_client(nodes: ,reload_connections: ,reload_on_failure: ,retry_on_failure: ,request_timeout:) transport = ::Elasticsearch::Transport::Transport::HTTP::Faraday.new( { hosts: nodes.map{ |node| Hash[node.map{ |k, v| [k.to_sym, v] }] }, options: { reload_connections: reload_connections, reload_on_failure: reload_on_failure, retry_on_failure: retry_on_failure, transport_options: { request: { timeout: request_timeout } } } } ) ::Elasticsearch::Client.new transport: transport end |
#delete_aliases ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 60 def delete_aliases indices = @client.indices.get_alias(name: @alias).keys indices.each do |index| if index != @index if @delete_old_alias @client.indices.delete_alias index: index, name: @alias Embulk.logger.info "deleted alias: #{@alias}, index: #{index}" end if @delete_old_index delete_index(index) end end end end |
#delete_index(index) ⇒ Object
75 76 77 78 79 80 81 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 75 def delete_index(index) indices = @client.cat.indices(format: 'json') if indices.any? { |i| i['index'] == index } @client.indices.delete index: index Embulk.logger.info "deleted index: #{index}" end end |
#generate_id(template, record, id_keys) ⇒ Object
116 117 118 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 116 def generate_id(template, record, id_keys) template % id_keys.map { |key| record[key] } end |
#generate_meta(record) ⇒ Object
120 121 122 123 124 125 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 120 def (record) = {} [@action] = { _index: @index, _type: @index_type } [@action][:_id] = generate_id(@id_format, record, @id_keys) unless @id_keys.nil? end |
#generate_source(record) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 99 def generate_source(record) result = {} record.each { |key, value| result[key] = value next if (value.nil? || !@array_columns) @array_columns.each do |array_column| if array_column['name'] == key array_value = value.split(array_column['delimiter']).reject(&:empty?) array_value = array_value.map(&:to_i) if array_column['is_integer'] result[key] = array_value end end } (@mode == 'update') ? {doc: result} : result end |
#put_template(before_template_name, before_template) ⇒ Object
48 49 50 51 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 48 def put_template(before_template_name, before_template) Embulk.logger.info("put template => #{before_template_name}") @client.indices.put_template name: before_template_name, body: before_template end |
#send(bulk_message) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/embulk/output/elasticsearch/connection.rb', line 83 def send() retries = 0 begin @client.bulk body: Embulk.logger.info "bulk: #{.size/2} success." rescue => e if retries < @retry_on_failure retries += 1 Embulk.logger.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.}" sleep 2**retries retry end raise "Could not push logs to Elasticsearch after #{retries} retries. #{e.}" end end |