Class: Embulk::Output::Elasticsearch

Inherits:
OutputPlugin
  • Object
show all
Defined in:
lib/embulk/output/elasticsearch_ruby.rb

Constant Summary collapse

ENABLE_MODE =
%w[normal update replace]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cleanup(task, schema, count, task_reports) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 40

def self.cleanup(task, schema, count, task_reports)
  if task['mode'] == 'replace'
    client = create_client(task)
    create_aliases(client, task['index'], get_index(task))
    delete_aliases(client, task)
  end
end

.create_aliases(client, als, index) ⇒ Object



66
67
68
69
70
71
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 66

def self.create_aliases(client, als, index)
  client.indices.update_aliases body: {
    actions: [{ add: { index: index, alias: als } }]
  }
  Embulk.logger.info "created alias: #{als}, index: #{index}"
end

.create_client(task) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 48

def self.create_client(task)
  transport = ::Elasticsearch::Transport::Transport::HTTP::Faraday.new(
    {
      hosts: task['nodes'].map{ |node| Hash[node.map{ |k, v| [k.to_sym, v] }] },
      options: {
        reload_connections: task['reload_connections'],
        reload_on_failure: task['reload_on_failure'],
        retry_on_failure: task['retry_on_failure'],
        transport_options: {
          request: { timeout: task['request_timeout'] }
        }
      }
    }
  )

  ::Elasticsearch::Client.new transport: transport
end

.delete_aliases(client, task) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 73

def self.delete_aliases(client, task)
  indices = client.indices.get_aliases.select { |key, value| value['aliases'].include? task['index'] }.keys
  indices = indices.select { |index| /^#{get_index_prefix(task)}-(\d*)/ =~ index }
  indices.each { |index|
    if index != get_index(task)
      client.indices.delete_alias index: index, name: task['index']
      Embulk.logger.info "deleted alias: #{task['index']}, index: #{index}"
      if task['delete_old_index']
        client.indices.delete index: index
        Embulk.logger.info "deleted index: #{index}"
      end
    end
  }
end

.get_index(task) ⇒ Object



88
89
90
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 88

def self.get_index(task)
  task['mode'] == 'replace' ? "#{get_index_prefix(task)}-#{task['time_value']}" : task['index']
end

.get_index_prefix(task) ⇒ Object



92
93
94
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 92

def self.get_index_prefix(task)
  "#{task['index']}-#{task['index_type']}"
end

.transaction(config, schema, count, &control) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 11

def self.transaction(config, schema, count, &control)
  task = {
    "nodes" => config.param("nodes", :array, default: [{ 'host' => 'localhost', 'port' => 9200 }]),
    "request_timeout" => config.param("request_timeout", :integer, default: 60),
    "index" => config.param("index", :string, default: 'logstash-%Y.%m.%d'),
    "mode" => config.param("mode", :string, default: 'normal'),
    "reload_connections" => config.param("reload_connections", :bool, default: true),
    "reload_on_failure" => config.param("reload_on_failure", :bool, default: false),
    "delete_old_index" => config.param("delete_old_index", :bool, default: false),
    "index_type" => config.param("index_type", :string),
    "id_keys" => config.param("id_keys", :array, default: nil),
    "id_format" => config.param("id_format", :string, default: nil),
    "array_columns" => config.param("array_columns", :array, default: nil),
    "bulk_actions" => config.param("bulk_actions", :integer, default: 1000),
    "retry_on_failure" => config.param("retry_on_failure", :integer, default: 5),
  }
  task['time_value'] = Time.now.strftime('%Y.%m.%d.%H.%M.%S')
  task['index'] = Time.now.strftime(task['index'])

  unless ENABLE_MODE.include?(task['mode'])
    raise ConfigError.new "`mode` must be one of #{ENABLE_MODE.join(', ')}"
  end
  Embulk.logger.info("mode => #{task['mode']}")

  task_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Instance Method Details

#abortObject



143
144
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 143

def abort
end

#add(page) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 121

def add(page)
  page.each do |record|
    hash = Hash[schema.names.zip(record)]
    action = (@mode == 'update') ? :update : :index
    meta = {}
    meta[action] = { _index: @index, _type: @index_type }
    meta[action][:_id] = generate_id(@id_format, hash, @id_keys) unless @id_keys.nil?
    source = generate_array(hash)
    @bulk_message << meta
    @bulk_message << source
    if @bulk_actions * 2 <= @bulk_message.size
      send
    end
  end
end

#closeObject



118
119
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 118

def close
end

#commitObject



146
147
148
149
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 146

def commit
  task_report = {}
  return task_report
end

#finishObject



137
138
139
140
141
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 137

def finish
  if @bulk_message.size > 0
    send
  end
end

#initObject

def self.resume(task, schema, count, &control)

task_reports = yield(task)

next_config_diff = {}
return next_config_diff

end



103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/embulk/output/elasticsearch_ruby.rb', line 103

def init
  @nodes = task["nodes"]
  @index_type = task["index_type"]
  @id_keys = task["id_keys"]
  @id_format = task["id_format"]
  @bulk_actions = task["bulk_actions"]
  @array_columns = task["array_columns"]
  @retry_on_failure = task["retry_on_failure"]
  @mode = task["mode"]
  @index = self.class.get_index(task)

  @client = self.class.create_client(task)
  @bulk_message = []
end