Class: Embulk::Output::Elasticsearch::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/elasticsearch/connection.rb

Instance Method Summary collapse

Constructor Details

#initialize(task) ⇒ Connection

Returns a new instance of Connection.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/embulk/output/elasticsearch/connection.rb', line 8

def initialize(task)
  @nodes = task["nodes"]
  @index_type = task["index_type"]
  @id_keys = task["id_keys"]
  @id_format = task["id_format"]
  @array_columns = task["array_columns"]
  @retry_on_failure = task["retry_on_failure"]
  @mode = task["mode"]
  @delete_old_index = task['delete_old_index']
  @delete_old_alias = task['delete_old_alias']
  @index = task['index']
  @alias = task['alias']
  @action = (@mode == 'update') ? :update : :index

  @client = create_client(
    nodes: task['nodes'],
    reload_connections: task['reload_connections'],
    reload_on_failure: task['reload_on_failure'],
    retry_on_failure: task['retry_on_failure'],
    request_timeout: task['request_timeout']
  )
end

Instance Method Details

#create_aliasesObject



53
54
55
56
57
58
# File 'lib/embulk/output/elasticsearch/connection.rb', line 53

def create_aliases
  @client.indices.update_aliases body: {
    actions: [{ add: { index: @index, alias: @alias } }]
  }
  Embulk.logger.info "created alias: #{@alias}, index: #{@index}"
end

#create_client(nodes:, reload_connections:, reload_on_failure:, retry_on_failure:, request_timeout:) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/embulk/output/elasticsearch/connection.rb', line 31

def create_client(nodes: ,reload_connections: ,reload_on_failure: ,retry_on_failure: ,request_timeout:)
  transport = ::Elasticsearch::Transport::Transport::HTTP::Faraday.new(
    {
      hosts: nodes.map{ |node| Hash[node.map{ |k, v| [k.to_sym, v] }] },
      options: {
        reload_connections: reload_connections,
        reload_on_failure: reload_on_failure,
        retry_on_failure: retry_on_failure,
        transport_options: {
          request: { timeout: request_timeout }
        }
      }
    }
  )
  ::Elasticsearch::Client.new transport: transport
end

#delete_aliasesObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/embulk/output/elasticsearch/connection.rb', line 60

def delete_aliases
  indices = @client.indices.get_alias(name: @alias).keys
  indices.each do |index|
    if index != @index
      if @delete_old_alias
        @client.indices.delete_alias index: index, name: @alias
        Embulk.logger.info "deleted alias: #{@alias}, index: #{index}"
      end
      if @delete_old_index
        delete_index(index)
      end
    end
  end
end

#delete_index(index) ⇒ Object



75
76
77
78
79
80
81
# File 'lib/embulk/output/elasticsearch/connection.rb', line 75

def delete_index(index)
  indices = @client.cat.indices(format: 'json')
  if indices.any? { |i| i['index'] == index }
    @client.indices.delete index: index
    Embulk.logger.info "deleted index: #{index}"
  end
end

#generate_id(template, record, id_keys) ⇒ Object



116
117
118
# File 'lib/embulk/output/elasticsearch/connection.rb', line 116

def generate_id(template, record, id_keys)
  template % id_keys.map { |key| record[key] }
end

#generate_meta(record) ⇒ Object



120
121
122
123
124
125
# File 'lib/embulk/output/elasticsearch/connection.rb', line 120

def generate_meta(record)
  meta = {}
  meta[@action] = { _index: @index, _type: @index_type }
  meta[@action][:_id] = generate_id(@id_format, record, @id_keys) unless @id_keys.nil?
  meta
end

#generate_source(record) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/embulk/output/elasticsearch/connection.rb', line 99

def generate_source(record)
  result = {}

  record.each { |key, value|
    result[key] = value
    next if (value.nil? || !@array_columns)
    @array_columns.each do |array_column|
      if array_column['name'] == key
        array_value = value.split(array_column['delimiter']).reject(&:empty?)
        array_value = array_value.map(&:to_i) if array_column['is_integer']
        result[key] = array_value
      end
    end
  }
  (@mode == 'update') ? {doc: result} : result
end

#put_template(before_template_name, before_template) ⇒ Object



48
49
50
51
# File 'lib/embulk/output/elasticsearch/connection.rb', line 48

def put_template(before_template_name, before_template)
  Embulk.logger.info("put template => #{before_template_name}")
  @client.indices.put_template name: before_template_name, body: before_template
end

#send(bulk_message) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/embulk/output/elasticsearch/connection.rb', line 83

def send(bulk_message)
  retries = 0
  begin
    @client.bulk body: bulk_message
    Embulk.logger.info "bulk: #{bulk_message.size/2} success."
  rescue => e
    if retries < @retry_on_failure
      retries += 1
      Embulk.logger.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}"
      sleep 2**retries
      retry
    end
    raise "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}"
  end
end