Class: Fluent::ParameterizedPathOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_parameterized_path.rb

Constant Summary collapse

DEFAULT_DIR_PERMISSION =
0755
DEFAULT_FILE_PERMISSION =
0644
SUPPORTED_COMPRESS =
{
  gz: :gz,
  gzip: :gz
}.freeze

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/out_parameterized_path.rb', line 23

def configure(conf)
  super

  raise ConfigError, "'path_prefix' parameter is required" unless @path_prefix
  raise ConfigError, "'path_key' parameter is required" unless @path_key

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)

  @suffix = case @compress
            when nil
              ''
            when :gz
              '.gz'
            end
end

#format(tag, time, record) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/out_parameterized_path.rb', line 40

def format(tag, time, record)
  log.warn("Undefined key: #{@path_key}") unless record.key?(@path_key)

  path = record[@path_key]
  dup = record.dup
  dup.delete(@path_key)

  data = @formatter.format(tag, time, dup)
  [path, data].to_msgpack
end

#write(chunk) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_parameterized_path.rb', line 51

def write(chunk)
  paths = {}
  chunk.msgpack_each do |(path, data)|
    path = generate_path(chunk.key, path)
    if paths.key?(path)
      paths[path] << data
    else
      paths[path] = data
    end
  end

  paths.each do |path, data|
    FileUtils.mkdir_p(File.dirname(path), mode: DEFAULT_DIR_PERMISSION)

    case @compress
    when nil
      File.open(path, 'ab', DEFAULT_FILE_PERMISSION) { |f| f.write(data) }
    when :gz
      File.open(path, 'ab', DEFAULT_FILE_PERMISSION) do |f|
        Zlib::GzipWriter.wrap(f) { |gz| gz.write(data) }
      end
    end
  end

  paths.keys # for test
end