Method: Fluent::Plugin::OpenSearchOutput#configure

Defined in:
lib/fluent/plugin/out_opensearch.rb

#configure(conf) ⇒ Object

AWS credential part is ended.

Raises:

  • (Fluent::ConfigError)


264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/fluent/plugin/out_opensearch.rb', line 264

def configure(conf)
  compat_parameters_convert(conf, :buffer)

  super

  if @endpoint
    # here overrides default value of reload_connections to false because
    # AWS Elasticsearch Service doesn't return addresses of nodes and Elasticsearch client
    # fails to reload connections properly. This ends up "temporarily failed to flush the buffer"
    # error repeating forever. See this discussion for details:
    # https://discuss.elastic.co/t/elasitcsearch-ruby-raises-cannot-get-new-connection-from-pool-error/36252
    @reload_connections = false
  end

  if placeholder_substitution_needed_for_template?
    # nop.
  elsif not @buffer_config.chunk_keys.include? "tag" and
    not @buffer_config.chunk_keys.include? "_index"
    raise Fluent::ConfigError, "'tag' or '_index' in chunk_keys is required."
  end
  @time_parser = create_time_parser
  @backend_options = backend_options
  @ssl_version_options = set_tls_minmax_version_config(@ssl_version, @ssl_max_version, @ssl_min_version)

  if @remove_keys
    @remove_keys = @remove_keys.split(/\s*,\s*/)
  end

  if @target_index_key && @target_index_key.is_a?(String)
    @target_index_key = @target_index_key.split '.'
  end

  if @remove_keys_on_update && @remove_keys_on_update.is_a?(String)
    @remove_keys_on_update = @remove_keys_on_update.split ','
  end

  raise Fluent::ConfigError, "'max_retry_putting_template' must be greater than or equal to zero." if @max_retry_putting_template < 0
  raise Fluent::ConfigError, "'max_retry_get_os_version' must be greater than or equal to zero." if @max_retry_get_os_version < 0

  # Dump log when using host placeholders and template features at same time.
  valid_host_placeholder = placeholder?(:host_placeholder, @host)
  if valid_host_placeholder && (@template_name && @template_file || @templates)
    if @verify_os_version_at_startup
      raise Fluent::ConfigError, "host placeholder, template installation, and verify OpenSearch version at startup are exclusive feature at same time. Please specify verify_os_version_at_startup as `false` when host placeholder and template installation are enabled."
    end
    log.info "host placeholder and template installation makes your OpenSearch cluster a bit slow down(beta)."
  end

  @template_names = []
  if !dry_run?
    if @template_name && @template_file
      if @logstash_format || placeholder_substitution_needed_for_template?
        class << self
          alias_method :template_installation, :template_installation_actual
        end
      else
        template_installation_actual(@template_name, @customize_template, @application_name, @index_name)
      end
    end
    if @templates
      retry_operate(@max_retry_putting_template,
                    @fail_on_putting_template_retry_exceed,
                    @catch_transport_exception_on_retry) do
        templates_hash_install(@templates, @template_overwrite)
      end
    end
  end

  @truncate_mutex = Mutex.new
  if @truncate_caches_interval
    timer_execute(:out_opensearch_truncate_caches, @truncate_caches_interval) do
      log.info('Clean up the indices and template names cache')

      @truncate_mutex.synchronize {
        @template_names.clear
      }
    end
  end
  # If AWS credentials is set, consider to expire credentials information forcibly before expired.
  @credential_mutex = Mutex.new
  if @endpoint
    @_aws_credentials = aws_credentials(@endpoint)

    if @endpoint.refresh_credentials_interval
      timer_execute(:out_opensearch_expire_credentials, @endpoint.refresh_credentials_interval) do
        log.debug('Recreate the AWS credentials')

        @credential_mutex.synchronize do
          @_os = nil
          begin
            @_aws_credentials = aws_credentials(@endpoint)
          rescue => e
            log.error("Failed to get new AWS credentials: #{e}")
          end
        end
      end
    end
  end

  @serializer_class = nil
  begin
    require 'oj'
    @dump_proc = Oj.method(:dump)
    if @prefer_oj_serializer
      @serializer_class = Fluent::Plugin::Serializer::Oj
      OpenSearch::API.settings[:serializer] = Fluent::Plugin::Serializer::Oj
    end
  rescue LoadError
    @dump_proc = Yajl.method(:dump)
  end

  raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

  if @user && m = @user.match(/%{(?<user>.*)}/)
    @user = URI.encode_www_form_component(m["user"])
  end
  if @password && m = @password.match(/%{(?<password>.*)}/)
    @password = URI.encode_www_form_component(m["password"])
  end

  @transport_logger = nil
  if @with_transporter_log
    @transport_logger = log
    log_level = conf['@log_level'] || conf['log_level']
    log.warn "Consider to specify log_level with @log_level." unless log_level
  end
  # Specify @sniffer_class before calling #client.
  # #detect_os_major_version uses #client.
  @sniffer_class = nil
  begin
    @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
  rescue Exception => ex
    raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
  end

  @selector_class = nil
  begin
    @selector_class = Object.const_get(@selector_class_name) if @selector_class_name
  rescue Exception => ex
    raise Fluent::ConfigError, "Could not load selector class #{@selector_class_name}: #{ex}"
  end

  @last_seen_major_version = if major_version = handle_last_seen_os_major_version
                               major_version
                             else
                               @default_opensearch_version
                             end

  if @validate_client_version && !dry_run?
    if @last_seen_major_version != client_library_version.to_i
      raise Fluent::ConfigError, "        Detected OpenSearch \#{@last_seen_major_version} but you use OpenSearch client \#{client_library_version}.\n        Please consider to use \#{@last_seen_major_version}.x series OpenSearch client.\n      EOC\n    end\n  end\n\n  if @last_seen_major_version >= 1\n    case @ssl_version\n    when :SSLv23, :TLSv1, :TLSv1_1\n      if @scheme == :https\n        log.warn \"Detected OpenSearch 1.x or above and enabled insecure security:\n                  You might have to specify `ssl_version TLSv1_2` in configuration.\"\n      end\n    end\n  end\n\n  if @ssl_version && @scheme == :https\n    if !@http_backend_excon_nonblock\n      log.warn \"TLS handshake will be stucked with block connection.\n                Consider to set `http_backend_excon_nonblock` as true\"\n    end\n  end\n\n  # Consider missing the prefix of \"$.\" in nested key specifiers.\n  @id_key = convert_compat_id_key(@id_key) if @id_key\n  @parent_key = convert_compat_id_key(@parent_key) if @parent_key\n  @routing_key = convert_compat_id_key(@routing_key) if @routing_key\n\n  @routing_key_name = configure_routing_key_name\n  @meta_config_map = create_meta_config_map\n  @current_config = nil\n  @compressable_connection = false\n\n  @ignore_exception_classes = @ignore_exceptions.map do |exception|\n    unless Object.const_defined?(exception)\n      log.warn \"Cannot find class \#{exception}. Will ignore it.\"\n\n      nil\n    else\n      Object.const_get(exception)\n    end\n  end.compact\n\n  if @bulk_message_request_threshold < 0\n    class << self\n      alias_method :split_request?, :split_request_size_uncheck?\n    end\n  else\n    class << self\n      alias_method :split_request?, :split_request_size_check?\n    end\n  end\nend\n"