Class: Fluent::SourceOnlyBufferAgent

Inherits:
Agent
  • Object
show all
Defined in:
lib/fluent/source_only_buffer_agent.rb

Constant Summary collapse

BUFFER_DIR_NAME =

Use INSTANCE_ID to use the same base dir as the other workers. This will make recovery easier.

Fluent::INSTANCE_ID

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes inherited from Agent

#context, #error_collector, #event_router, #filters, #log, #outputs

Instance Method Summary collapse

Methods inherited from Agent

#add_filter, #add_match, #lifecycle, #lifecycle_control_list

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initialize(log:, system_config:) ⇒ SourceOnlyBufferAgent

Returns a new instance of SourceOnlyBufferAgent.



26
27
28
29
30
31
32
33
# File 'lib/fluent/source_only_buffer_agent.rb', line 26

def initialize(log:, system_config:)
  super(log: log)

  @default_buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME)
  @optional_buffer_config = system_config.source_only_buffer.to_h.transform_keys(&:to_s)
  @base_buffer_dir = nil
  @actual_buffer_dir = nil
end

Instance Method Details

#cleanupObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/source_only_buffer_agent.rb', line 60

def cleanup
  unless (Dir.empty?(@actual_buffer_dir) rescue true)
    log.warn "some buffer files remain in #{@base_buffer_dir}." +
             " Please consider recovering or saving the buffer files in the directory." +
             " To recover them, you can set the buffer path manually to system config and" +
             " retry, i.e., restart Fluentd with with-source-only mode and send SIGWINCH again." +
             " Config Example:\n#{config_example_to_recover(@base_buffer_dir)}"
    return
  end

  begin
    FileUtils.remove_dir(@base_buffer_dir)
  rescue Errno::ENOENT
    # This worker doesn't need to do anything. Another worker may remove the dir first.
  rescue => e
    log.warn "failed to remove the buffer directory: #{@base_buffer_dir}", error: e
  end
end

#configure(flush: false) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/source_only_buffer_agent.rb', line 35

def configure(flush: false)
  buffer_config = @optional_buffer_config.compact
  buffer_config['flush_at_shutdown'] = flush ? 'true' : 'false'
  buffer_config['flush_thread_count'] = 0 unless flush
  buffer_config['path'] ||= @default_buffer_path

  super(
    Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [
      Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT'}, [
        Config::Element.new('buffer', '', buffer_config, [])
      ])
    ])
  )

  @base_buffer_dir = buffer_config['path']
  # It can be "#{@base_buffer_dir}/worker#{fluentd_worker_id}/" when using multiple workers
  @actual_buffer_dir = File.dirname(outputs[0].buffer.path)

  unless flush
    log.info "with-source-only: the emitted data will be stored in the buffer files under" +
             " #{@base_buffer_dir}. You can send SIGWINCH to the supervisor process to cancel" +
             " with-source-only mode and process data."
  end
end

#emit_error_event(tag, time, record, error) ⇒ Object



79
80
81
82
# File 'lib/fluent/source_only_buffer_agent.rb', line 79

def emit_error_event(tag, time, record, error)
  error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag, time: time, record: record}
  log.warn "SourceOnlyBufferAgent: dump an error event:", error_info
end

#handle_emits_error(tag, es, error) ⇒ Object



84
85
86
87
88
# File 'lib/fluent/source_only_buffer_agent.rb', line 84

def handle_emits_error(tag, es, error)
  error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag}
  log.warn "SourceOnlyBufferAgent: emit transaction failed:", error_info
  log.warn_backtrace
end