Class: Fluent::Plugin::ElasticsearchInput

Inherits:
Input
  • Object
show all
Includes:
ElasticsearchConstants
Defined in:
lib/fluent/plugin/in_elasticsearch.rb

Defined Under Namespace

Classes: UnrecoverableRequestFailure

Constant Summary collapse

DEFAULT_RELOAD_AFTER =
-1
DEFAULT_STORAGE_TYPE =
'local'
METADATA =
"@metadata".freeze

Constants included from ElasticsearchConstants

Fluent::Plugin::ElasticsearchConstants::BODY_DELIMITER, Fluent::Plugin::ElasticsearchConstants::CREATE_OP, Fluent::Plugin::ElasticsearchConstants::ID_FIELD, Fluent::Plugin::ElasticsearchConstants::INDEX_OP, Fluent::Plugin::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::ElasticsearchConstants::UPDATE_OP, Fluent::Plugin::ElasticsearchConstants::UPSERT_OP

Instance Method Summary collapse

Constructor Details

#initializeElasticsearchInput

Returns a new instance of ElasticsearchInput.



60
61
62
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 60

def initialize
  super
end

Instance Method Details

#backend_optionsObject



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 102

def backend_options
  case @http_backend
  when :excon
    { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
  when :typhoeus
    require 'typhoeus'
    { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
  end
rescue LoadError => ex
  log.error_backtrace(ex.backtrace)
  raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end

#client(host = nil) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 207

def client(host = nil)
  # check here to see if we already have a client connection for the given host
  connection_options = get_connection_options(host)

  @_es = nil unless is_existing_connection(connection_options[:hosts])

  @_es ||= begin
    @current_config = connection_options[:hosts].clone
    adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
    local_reload_connections = @reload_connections
    if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
      local_reload_connections = @reload_after
    end

    headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)

    transport = TRANSPORT_CLASS::Transport::HTTP::Faraday.new(
      connection_options.merge(
        options: {
          reload_connections: local_reload_connections,
          reload_on_failure: @reload_on_failure,
          resurrect_after: @resurrect_after,
          logger: @transport_logger,
          transport_options: {
            headers: headers,
            request: { timeout: @request_timeout },
            ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
          },
          http: {
            user: @user,
            password: @password
          },
          sniffer_class: @sniffer_class,
        }), &adapter_conf)
    Elasticsearch::Client.new transport: transport
  end
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 64

def configure(conf)
  super

  @timestamp_parser = create_time_parser
  @backend_options = backend_options

  raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

  if @user && m = @user.match(/%{(?<user>.*)}/)
    @user = URI.encode_www_form_component(m["user"])
  end
  if @password && m = @password.match(/%{(?<password>.*)}/)
    @password = URI.encode_www_form_component(m["password"])
  end

  @transport_logger = nil
  if @with_transporter_log
    @transport_logger = log
    log_level = conf['@log_level'] || conf['log_level']
    log.warn "Consider to specify log_level with @log_level." unless log_level
  end
  @current_config = nil
  # Specify @sniffer_class before calling #client.
  @sniffer_class = nil
  begin
    @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
  rescue Exception => ex
    raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
  end

  @options = {
    :index => @index_name,
    :scroll => @scroll,
    :size => @size
  }
  @base_query = @query
end

#convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z") ⇒ Object



195
196
197
198
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 195

def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
  numeric_time_parser = Fluent::NumericTimeParser.new(:float)
  Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format)
end

#create_time_parserObject

once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn’t quite do what we want - if gives

sec,nsec

where as we want something we can call ‘strftime` on…



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 168

def create_time_parser
  if @timestamp_key_format
    begin
      # Strptime doesn't support all formats, but for those it does it's
      # blazingly fast.
      strptime = Strptime.new(@timestamp_key_format)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
        strptime.exec(value).to_time
      }
    rescue
      # Can happen if Strptime doesn't recognize the format; or
      # if strptime couldn't be required (because it's not installed -- it's
      # ruby 2 only)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
        DateTime.strptime(value, @timestamp_key_format).to_time
      }
    end
  else
    Proc.new { |value|
      value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
      DateTime.parse(value).to_time
    }
  end
end

#get_connection_options(con_host = nil) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 127

def get_connection_options(con_host=nil)

  hosts = if con_host || @hosts
    (con_host || @hosts).split(',').map do |host_str|
      # Support legacy hosts format host:port,host:port,host:port...
      if host_str.match(%r{^[^:]+(\:\d+)?$})
        {
          host:   host_str.split(':')[0],
          port:   (host_str.split(':')[1] || @port).to_i,
          scheme: @scheme.to_s
        }
      else
        # New hosts format expects URLs such as http://logs.foo.com,https://john:[email protected]/elastic
        uri = URI(get_escaped_userinfo(host_str))
        %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
          hash
        end
      end
    end.compact
  else
    [{host: @host, port: @port, scheme: @scheme.to_s}]
  end.each do |host|
    host.merge!(user: @user, password: @password) if !host[:user] && @user
    host.merge!(path: @path) if !host[:path] && @path
  end

  {
    hosts: hosts
  }
end

#get_escaped_userinfo(host_str) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 115

def get_escaped_userinfo(host_str)
  if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
    m["scheme"] +
      URI.encode_www_form_component(m["user"]) +
      ':' +
      URI.encode_www_form_component(m["password"]) +
      m["path"]
  else
    host_str
  end
end

#is_existing_connection(host) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 245

def is_existing_connection(host)
  # check if the host provided match the current connection
  return false if @_es.nil?
  return false if @current_config.nil?
  return false if host.length != @current_config.length

  for i in 0...host.length
    if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
      return false
    end
  end

  return true
end

#parse_time(value, event_time, tag) ⇒ Object



200
201
202
203
204
205
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 200

def parse_time(value, event_time, tag)
  @timestamp_parser.call(value)
rescue => e
  router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
  return Time.at(event_time).to_time
end

#process_events(hit, es) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 306

def process_events(hit, es)
  event = hit["_source"]
  time = Fluent::Engine.now
  if @parse_timestamp
    if event.has_key?(TIMESTAMP_FIELD)
      rts = event[TIMESTAMP_FIELD]
      time = parse_time(rts, time, @tag)
    end
  end
  if @docinfo
    docinfo_target = event[@docinfo_target] || {}

    unless docinfo_target.is_a?(Hash)
      raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
    end

    @docinfo_fields.each do |field|
      docinfo_target[field] = hit[field]
    end

    event[@docinfo_target] = docinfo_target
  end
  es.add(time, event)
end

#process_next_scroll_request(es, scroll_id) ⇒ Object



300
301
302
303
304
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 300

def process_next_scroll_request(es, scroll_id)
  result = process_scroll_request(scroll_id)
  result['hits']['hits'].each { |hit| process_events(hit, es) }
  {'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']}
end

#process_scroll_request(scroll_id) ⇒ Object



296
297
298
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 296

def process_scroll_request(scroll_id)
  client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end

#runObject



260
261
262
263
264
265
266
267
268
269
270
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 260

def run
  return run_slice if @num_slices <= 1

  log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8

  @num_slices.times.map do |slice_id|
    thread_create(:"in_elasticsearch_thread_#{slice_id}") do
      run_slice(slice_id)
    end
  end
end

#run_slice(slice_id = nil) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 272

def run_slice(slice_id=nil)
  slice_query = @base_query
  slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
  result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
  es = Fluent::MultiEventStream.new

  result["hits"]["hits"].each {|hit| process_events(hit, es)}
  has_hits = result['hits']['hits'].any?
  scroll_id = result['_scroll_id']

  while has_hits && scroll_id
    result = process_next_scroll_request(es, scroll_id)
    has_hits = result['has_hits']
    scroll_id = result['_scroll_id']
  end

  router.emit_stream(@tag, es)
  if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("7.0.0")
    client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
  else
    client.clear_scroll(scroll_id: scroll_id) if scroll_id
  end
end

#startObject



159
160
161
162
163
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 159

def start
  super

  timer_execute(:in_elasticsearch_timer, @interval, repeat: @repeat, &method(:run))
end