Class: Fluent::SourceOnlyBufferAgent
- Defined in:
- lib/fluent/source_only_buffer_agent.rb
Constant Summary collapse
- BUFFER_DIR_NAME =
Use INSTANCE_ID to use the same base dir as the other workers. This will make recovery easier.
Fluent::INSTANCE_ID
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Agent
#context, #error_collector, #event_router, #filters, #log, #outputs
Instance Method Summary collapse
- #cleanup ⇒ Object
- #configure(flush: false) ⇒ Object
- #emit_error_event(tag, time, record, error) ⇒ Object
- #handle_emits_error(tag, es, error) ⇒ Object
-
#initialize(log:, system_config:) ⇒ SourceOnlyBufferAgent
constructor
A new instance of SourceOnlyBufferAgent.
Methods inherited from Agent
#add_filter, #add_match, #lifecycle, #lifecycle_control_list
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize(log:, system_config:) ⇒ SourceOnlyBufferAgent
Returns a new instance of SourceOnlyBufferAgent.
26 27 28 29 30 31 32 33 |
# File 'lib/fluent/source_only_buffer_agent.rb', line 26 def initialize(log:, system_config:) super(log: log) @default_buffer_path = File.join(system_config.root_dir || DEFAULT_BACKUP_DIR, 'source-only-buffer', BUFFER_DIR_NAME) @optional_buffer_config = system_config.source_only_buffer.to_h.transform_keys(&:to_s) @base_buffer_dir = nil @actual_buffer_dir = nil end |
Instance Method Details
#cleanup ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/source_only_buffer_agent.rb', line 60 def cleanup unless (Dir.empty?(@actual_buffer_dir) rescue true) log.warn "some buffer files remain in #{@base_buffer_dir}." + " Please consider recovering or saving the buffer files in the directory." + " To recover them, you can set the buffer path manually to system config and" + " retry, i.e., restart Fluentd with with-source-only mode and send SIGWINCH again." + " Config Example:\n#{config_example_to_recover(@base_buffer_dir)}" return end begin FileUtils.remove_dir(@base_buffer_dir) rescue Errno::ENOENT # This worker doesn't need to do anything. Another worker may remove the dir first. rescue => e log.warn "failed to remove the buffer directory: #{@base_buffer_dir}", error: e end end |
#configure(flush: false) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/source_only_buffer_agent.rb', line 35 def configure(flush: false) buffer_config = @optional_buffer_config.compact buffer_config['flush_at_shutdown'] = flush ? 'true' : 'false' buffer_config['flush_thread_count'] = 0 unless flush buffer_config['path'] ||= @default_buffer_path super( Config::Element.new('SOURCE_ONLY_BUFFER', '', {}, [ Config::Element.new('match', '**', {'@type' => 'buffer', '@label' => '@ROOT'}, [ Config::Element.new('buffer', '', buffer_config, []) ]) ]) ) @base_buffer_dir = buffer_config['path'] # It can be "#{@base_buffer_dir}/worker#{fluentd_worker_id}/" when using multiple workers @actual_buffer_dir = File.dirname(outputs[0].buffer.path) unless flush log.info "with-source-only: the emitted data will be stored in the buffer files under" + " #{@base_buffer_dir}. You can send SIGWINCH to the supervisor process to cancel" + " with-source-only mode and process data." end end |
#emit_error_event(tag, time, record, error) ⇒ Object
79 80 81 82 |
# File 'lib/fluent/source_only_buffer_agent.rb', line 79 def emit_error_event(tag, time, record, error) error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag, time: time, record: record} log.warn "SourceOnlyBufferAgent: dump an error event:", error_info end |
#handle_emits_error(tag, es, error) ⇒ Object
84 85 86 87 88 |
# File 'lib/fluent/source_only_buffer_agent.rb', line 84 def handle_emits_error(tag, es, error) error_info = {error: error, location: (error.backtrace ? error.backtrace.first : nil), tag: tag} log.warn "SourceOnlyBufferAgent: emit transaction failed:", error_info log.warn_backtrace end |