Class: Fluent::Plugin::FileOutput

Inherits:
Output show all
Defined in:
lib/fluent/plugin/out_file.rb

Defined Under Namespace

Modules: SymlinkBufferMixin

Constant Summary collapse

SUPPORTED_COMPRESS =
[:text, :gz, :gzip]
SUPPORTED_COMPRESS_MAP =
{
  text: nil,
  gz: :gzip,
  gzip: :gzip,
}
FILE_PERMISSION =
0644
DIR_PERMISSION =
0755
DEFAULT_TIMEKEY =
60 * 60 * 24

Constants inherited from Output

Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Output

#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Output

#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #check_slow_flush, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #initialize, #interrupt_flushes, #metadata, #metadata_for_test, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #start, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir

Methods inherited from Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #initialize, #inspect, #plugin_root_dir, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Output

Instance Attribute Details

#last_written_pathObject

for tests



70
71
72
# File 'lib/fluent/plugin/out_file.rb', line 70

def last_written_path
  @last_written_path
end

Instance Method Details

#compression_suffix(compress) ⇒ Object



239
240
241
242
243
244
245
246
# File 'lib/fluent/plugin/out_file.rb', line 239

def compression_suffix(compress)
  case compress
  when :gzip then '.gz'
  when nil then ''
  else
    raise ArgumentError, "unknown compression type #{compress}"
  end
end

#configure(conf) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/out_file.rb', line 91

def configure(conf)
  compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")

  configured_time_slice_format = conf['time_slice_format']

  if conf.elements(name: 'buffer').empty?
    conf.add_element('buffer', 'time')
  end
  buffer_conf = conf.elements(name: 'buffer').first
  # Fluent::PluginId#configure is not called yet, so we can't use #plugin_root_dir here.
  if !buffer_conf.has_key?('path') && !(conf['@id'] && system_config.root_dir)
    # v0.14 file buffer handles path as directory if '*' is missing
    # 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin,
    # but raise it in this plugin.
    buffer_conf['path'] = conf['path'] || '/tmp/dummy_path'
  end

  if conf.has_key?('utc') || conf.has_key?('localtime')
    param_name = conf.has_key?('utc') ? 'utc' : 'localtime'
    log.warn "'#{param_name}' is deperecated for output plugin. This parameter is used for formatter plugin in compatibility layer. If you want to use same feature, use timekey_use_utc parameter in <buffer> directive instead"
  end

  super

  @compress_method = SUPPORTED_COMPRESS_MAP[@compress]

  if @path.include?('*') && !@buffer_config.timekey
    raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'"
  end

  path_suffix = @add_path_suffix ? @path_suffix : ''
  path_timekey = if @chunk_key_time
                   @as_secondary ? @primary_instance.buffer_config.timekey : @buffer_config.timekey
                 else
                   nil
                 end
  @path_template = generate_path_template(@path, path_timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format)

  if @as_secondary
    # When this plugin is configured as secondary & primary plugin has tag key, but this plugin may not have it.
    # Increment placeholder can make another output file per chunk tag/keys even if original path doesn't include it.
    placeholder_validators(:path, @path_template).select{|v| v.type == :time }.each do |v|
      v.validate!
    end
  else
    placeholder_validate!(:path, @path_template)

    max_tag_index = get_placeholders_tag(@path_template).max || 1
    max_tag_index = 1 if max_tag_index < 1
    dummy_tag = (['a'] * max_tag_index).join('.')
    dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
    dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

    test_meta1 = (dummy_tag, Fluent::Engine.now, dummy_record)
    test_path = extract_placeholders(@path_template, test_meta1)
    unless ::Fluent::FileUtil.writable_p?(test_path)
      raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
    end
  end

  @formatter = formatter_create

  if @symlink_path && @buffer.respond_to?(:path)
    if @as_secondary
      raise Fluent::ConfigError, "symlink_path option is unavailable in <secondary>: consider to use secondary_file plugin"
    end
    if Fluent.windows?
      log.warn "symlink_path is unavailable on Windows platform. disabled."
      @symlink_path = nil
    else
      @buffer.extend SymlinkBufferMixin
      @buffer.symlink_path = @symlink_path
    end
  end

  @dir_perm = system_config.dir_permission || DIR_PERMISSION
  @file_perm = system_config.file_permission || FILE_PERMISSION
  @need_lock = system_config.workers > 1
end

#find_filepath_available(path_with_placeholder, with_lock: false) ⇒ Object

for non-append



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/fluent/plugin/out_file.rb', line 280

def find_filepath_available(path_with_placeholder, with_lock: false) # for non-append
  raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**')
  i = 0
  dir_path = locked = nil
  while true
    path = path_with_placeholder.sub('_**', "_#{i}")
    i += 1
    next if File.exist?(path)

    if with_lock
      dir_path = path + '.lock'
      locked = Dir.mkdir(dir_path) rescue false
      next unless locked
      # ensure that other worker doesn't create a file (and release lock)
      # between previous File.exist? and Dir.mkdir
      next if File.exist?(path)
    end

    break
  end
  yield path
ensure
  if dir_path && locked && Dir.exist?(dir_path)
    Dir.rmdir(dir_path) rescue nil
  end
end

#format(tag, time, record) ⇒ Object



175
176
177
178
# File 'lib/fluent/plugin/out_file.rb', line 175

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil) ⇒ Object

/path/to/dir/file.* -> /path/to/dir/file.%Y%m%d /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log

%Y%m%d -> %Y%m%d_** (non append)

+ .gz (gzipped) TODO: remove time_slice_format when end of support of compat_parameters



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/fluent/plugin/out_file.rb', line 254

def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil)
  comp_suffix = compression_suffix(compress)
  index_placeholder = append ? '' : '_**'
  if original.index('*')
    raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey
    time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey)
    original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix
  else
    if timekey
      if time_slice_format
        "#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}"
      else
        time_placeholders = timekey_to_timeformat(timekey)
        if time_placeholders.scan(/../).any?{|ph| original.include?(ph) }
          raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) }
          "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        else
          "#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        end
      end
    else
      "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
    end
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


171
172
173
# File 'lib/fluent/plugin/out_file.rb', line 171

def multi_workers_ready?
  true
end

#timekey_to_timeformat(timekey) ⇒ Object



229
230
231
232
233
234
235
236
237
# File 'lib/fluent/plugin/out_file.rb', line 229

def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end

#write(chunk) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/fluent/plugin/out_file.rb', line 180

def write(chunk)
  path = extract_placeholders(@path_template, chunk.)
  FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

  writer = case
           when @compress_method.nil?
             method(:write_without_compression)
           when @compress_method == :gzip
             if @buffer.compress != :gzip || @recompress
               method(:write_gzip_with_compression)
             else
               method(:write_gzip_from_gzipped_chunk)
             end
           else
             raise "BUG: unknown compression method #{@compress_method}"
           end

  if @append
    writer.call(path, chunk)
  else
    find_filepath_available(path, with_lock: @need_lock) do |actual_path|
      writer.call(actual_path, chunk)
      path = actual_path
    end
  end

  @last_written_path = path
end

#write_gzip_from_gzipped_chunk(path, chunk) ⇒ Object



223
224
225
226
227
# File 'lib/fluent/plugin/out_file.rb', line 223

def write_gzip_from_gzipped_chunk(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f, compressed: :gzip)
  end
end

#write_gzip_with_compression(path, chunk) ⇒ Object



215
216
217
218
219
220
221
# File 'lib/fluent/plugin/out_file.rb', line 215

def write_gzip_with_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    gz = Zlib::GzipWriter.new(f)
    chunk.write_to(gz, compressed: :text)
    gz.close
  end
end

#write_without_compression(path, chunk) ⇒ Object



209
210
211
212
213
# File 'lib/fluent/plugin/out_file.rb', line 209

def write_without_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f)
  end
end