Class: Fluent::ParameterizedPathOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::ParameterizedPathOutput
- Defined in:
- lib/fluent/plugin/out_parameterized_path.rb
Constant Summary collapse
- DEFAULT_DIR_PERMISSION =
0755
- DEFAULT_FILE_PERMISSION =
0644
- SUPPORTED_COMPRESS =
{ gz: :gz, gzip: :gz }.freeze
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/out_parameterized_path.rb', line 23 def configure(conf) super raise ConfigError, "'path_prefix' parameter is required" unless @path_prefix raise ConfigError, "'path_key' parameter is required" unless @path_key @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) @suffix = case @compress when nil '' when :gz '.gz' end end |
#format(tag, time, record) ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/out_parameterized_path.rb', line 40 def format(tag, time, record) log.warn("Undefined key: #{@path_key}") unless record.key?(@path_key) path = record[@path_key] dup = record.dup dup.delete(@path_key) data = @formatter.format(tag, time, dup) [path, data].to_msgpack end |
#write(chunk) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/out_parameterized_path.rb', line 51 def write(chunk) paths = {} chunk.msgpack_each do |(path, data)| path = generate_path(chunk.key, path) if paths.key?(path) paths[path] << data else paths[path] = data end end paths.each do |path, data| FileUtils.mkdir_p(File.dirname(path), mode: DEFAULT_DIR_PERMISSION) case @compress when nil File.open(path, 'ab', DEFAULT_FILE_PERMISSION) { |f| f.write(data) } when :gz File.open(path, 'ab', DEFAULT_FILE_PERMISSION) do |f| Zlib::GzipWriter.wrap(f) { |gz| gz.write(data) } end end end paths.keys # for test end |