Class: Fluent::Plugin::FileOutput

Inherits:
Output show all
Defined in:
lib/fluent/plugin/out_file.rb

Defined Under Namespace

Modules: SymlinkBufferMixin

Constant Summary collapse

SUPPORTED_COMPRESS =
[:text, :gz, :gzip]
SUPPORTED_COMPRESS_MAP =
{
  text: nil,
  gz: :gzip,
  gzip: :gzip,
}
FILE_PERMISSION =
0644
DIR_PERMISSION =
0755
DEFAULT_TIMEKEY =
60 * 60 * 24

Constants inherited from Output

Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Output

#as_secondary, #buffer, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Output

#acts_as_secondary, #after_shutdown, #after_start, #before_shutdown, #check_slow_flush, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #formatted_to_msgpack_binary, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #initialize, #interrupt_flushes, #metadata, #metadata_for_test, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #start, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir

Methods inherited from Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #initialize, #inspect, #plugin_root_dir, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Output

Instance Attribute Details

#last_written_pathObject

for tests



70
71
72
# File 'lib/fluent/plugin/out_file.rb', line 70

def last_written_path
  @last_written_path
end

Instance Method Details

#compression_suffix(compress) ⇒ Object



234
235
236
237
238
239
240
241
# File 'lib/fluent/plugin/out_file.rb', line 234

def compression_suffix(compress)
  case compress
  when :gzip then '.gz'
  when nil then ''
  else
    raise ArgumentError, "unknown compression type #{compress}"
  end
end

#configure(conf) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fluent/plugin/out_file.rb', line 91

def configure(conf)
  compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")

  configured_time_slice_format = conf['time_slice_format']

  if conf.elements(name: 'buffer').empty?
    conf.add_element('buffer', 'time')
  end
  buffer_conf = conf.elements(name: 'buffer').first
  # Fluent::PluginId#configure is not called yet, so we can't use #plugin_root_dir here.
  if !buffer_conf.has_key?('path') && !(conf['@id'] && system_config.root_dir)
    # v0.14 file buffer handles path as directory if '*' is missing
    # 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin,
    # but raise it in this plugin.
    buffer_conf['path'] = conf['path'] || '/tmp/dummy_path'
  end

  super

  @compress_method = SUPPORTED_COMPRESS_MAP[@compress]

  if @path.include?('*') && !@buffer_config.timekey
    raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'"
  end

  path_suffix = @add_path_suffix ? @path_suffix : ''
  path_timekey = if @chunk_key_time
                   @as_secondary ? @primary_instance.buffer_config.timekey : @buffer_config.timekey
                 else
                   nil
                 end
  @path_template = generate_path_template(@path, path_timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format)

  if @as_secondary
    # When this plugin is configured as secondary & primary plugin has tag key, but this plugin may not have it.
    # Increment placeholder can make another output file per chunk tag/keys even if original path doesn't include it.
    placeholder_validators(:path, @path_template).select{|v| v.type == :time }.each do |v|
      v.validate!
    end
  else
    placeholder_validate!(:path, @path_template)

    max_tag_index = get_placeholders_tag(@path_template).max || 1
    max_tag_index = 1 if max_tag_index < 1
    dummy_tag = (['a'] * max_tag_index).join('.')
    dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
    dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

    test_meta1 = (dummy_tag, Fluent::Engine.now, dummy_record)
    test_path = extract_placeholders(@path_template, test_meta1)
    unless ::Fluent::FileUtil.writable_p?(test_path)
      raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
    end
  end

  @formatter = formatter_create

  if @symlink_path && @buffer.respond_to?(:path)
    if @as_secondary
      raise Fluent::ConfigError, "symlink_path option is unavailable in <secondary>: consider to use secondary_file plugin"
    end
    if Fluent.windows?
      log.warn "symlink_path is unavailable on Windows platform. disabled."
      @symlink_path = nil
    else
      @buffer.extend SymlinkBufferMixin
      @buffer.symlink_path = @symlink_path
    end
  end

  @dir_perm = system_config.dir_permission || DIR_PERMISSION
  @file_perm = system_config.file_permission || FILE_PERMISSION
  @need_lock = system_config.workers > 1
end

#find_filepath_available(path_with_placeholder, with_lock: false) ⇒ Object

for non-append



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/fluent/plugin/out_file.rb', line 275

def find_filepath_available(path_with_placeholder, with_lock: false) # for non-append
  raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**')
  i = 0
  dir_path = locked = nil
  while true
    path = path_with_placeholder.sub('_**', "_#{i}")
    i += 1
    next if File.exist?(path)

    if with_lock
      dir_path = path + '.lock'
      locked = Dir.mkdir(dir_path) rescue false
      next unless locked
      # ensure that other worker doesn't create a file (and release lock)
      # between previous File.exist? and Dir.mkdir
      next if File.exist?(path)
    end

    break
  end
  yield path
ensure
  if dir_path && locked && Dir.exist?(dir_path)
    Dir.rmdir(dir_path) rescue nil
  end
end

#format(tag, time, record) ⇒ Object



170
171
172
173
# File 'lib/fluent/plugin/out_file.rb', line 170

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil) ⇒ Object

/path/to/dir/file.* -> /path/to/dir/file.%Y%m%d /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log

Y%m%d -> Y%m%d_** (non append)

+ .gz (gzipped) TODO: remove time_slice_format when end of support of compat_parameters



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/fluent/plugin/out_file.rb', line 249

def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil)
  comp_suffix = compression_suffix(compress)
  index_placeholder = append ? '' : '_**'
  if original.index('*')
    raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey
    time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey)
    original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix
  else
    if timekey
      if time_slice_format
        "#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}"
      else
        time_placeholders = timekey_to_timeformat(timekey)
        if time_placeholders.scan(/../).any?{|ph| original.include?(ph) }
          raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) }
          "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        else
          "#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        end
      end
    else
      "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
    end
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/fluent/plugin/out_file.rb', line 166

def multi_workers_ready?
  true
end

#timekey_to_timeformat(timekey) ⇒ Object



224
225
226
227
228
229
230
231
232
# File 'lib/fluent/plugin/out_file.rb', line 224

def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end

#write(chunk) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/fluent/plugin/out_file.rb', line 175

def write(chunk)
  path = extract_placeholders(@path_template, chunk.)
  FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

  writer = case
           when @compress_method.nil?
             method(:write_without_compression)
           when @compress_method == :gzip
             if @buffer.compress != :gzip || @recompress
               method(:write_gzip_with_compression)
             else
               method(:write_gzip_from_gzipped_chunk)
             end
           else
             raise "BUG: unknown compression method #{@compress_method}"
           end

  if @append
    writer.call(path, chunk)
  else
    find_filepath_available(path, with_lock: @need_lock) do |actual_path|
      writer.call(actual_path, chunk)
      path = actual_path
    end
  end

  @last_written_path = path
end

#write_gzip_from_gzipped_chunk(path, chunk) ⇒ Object



218
219
220
221
222
# File 'lib/fluent/plugin/out_file.rb', line 218

def write_gzip_from_gzipped_chunk(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f, compressed: :gzip)
  end
end

#write_gzip_with_compression(path, chunk) ⇒ Object



210
211
212
213
214
215
216
# File 'lib/fluent/plugin/out_file.rb', line 210

def write_gzip_with_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    gz = Zlib::GzipWriter.new(f)
    chunk.write_to(gz, compressed: :text)
    gz.close
  end
end

#write_without_compression(path, chunk) ⇒ Object



204
205
206
207
208
# File 'lib/fluent/plugin/out_file.rb', line 204

def write_without_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f)
  end
end