Class: LogStash::Outputs::File

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/file.rb

Overview

This output writes events to files on disk. You can use fields from the event as parts of the filename and/or path.

By default, this output writes one event per line in json format. You can customise the line format using the ‘line` codec like

source,ruby

output

file {
  path => ...
  codec => line { format => "custom format: %{message"}
}

}

Defined Under Namespace

Classes: Interval

Constant Summary collapse

FIELD_REF =
/%\{[^}]+\}/

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#failure_pathObject (readonly)

Returns the value of attribute failure_path.



26
27
28
# File 'lib/logstash/outputs/file.rb', line 26

def failure_path
  @failure_path
end

Instance Method Details

#closeObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/logstash/outputs/file.rb', line 135

def close
  @flusher.stop unless @flusher.nil?
  @io_mutex.synchronize do
    @logger.debug("Close: closing files")

    @files.each do |path, fd|
      begin
        fd.close
        @logger.debug("Closed file #{path}", :fd => fd)
      rescue Exception => e
        @logger.error("Exception while flushing and closing files.", :exception => e)
      end
    end
  end
end

#multi_receive_encoded(events_and_encoded) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/logstash/outputs/file.rb', line 109

def multi_receive_encoded(events_and_encoded)
  encoded_by_path = Hash.new {|h,k| h[k] = []}

  events_and_encoded.each do |event,encoded|
    file_output_path = event_path(event)
    encoded_by_path[file_output_path] << encoded
  end

  @io_mutex.synchronize do
    encoded_by_path.each do |path,chunks|
      fd = open(path)
      if @write_behavior == "overwrite"
        fd.truncate(0)
        fd.seek(0, IO::SEEK_SET)
        fd.write(chunks.last)
      else
        # append to the file
        chunks.each {|chunk| fd.write(chunk) }
      end
      fd.flush unless @flusher && @flusher.alive?
    end

    close_stale_files
  end
end

#registerObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/logstash/outputs/file.rb', line 84

def register
  require "fileutils" # For mkdir_p

  @files = {}
  @io_mutex = Mutex.new

  @path = File.expand_path(path)

  validate_path

  if path_with_field_ref?
    @file_root = extract_file_root
  else
    @file_root = File.dirname(path)
  end
  @failure_path = File.join(@file_root, @filename_failure)

  @flush_interval = @flush_interval.to_i
  if @flush_interval > 0
    @flusher = Interval.start(@flush_interval, -> { flush_pending_files })
  end

  @last_stale_cleanup_cycle = Time.now
end