Class: Fluent::BufferedOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/output.rb

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes inherited from Output

#router

Attributes included from PluginLoggerMixin

#log

Instance Method Summary collapse

Methods inherited from Output

#secondary_init

Methods included from PluginLoggerMixin

included

Methods included from PluginId

#plugin_id

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeBufferedOutput

Returns a new instance of BufferedOutput.



180
181
182
183
184
185
186
187
188
189
# File 'lib/fluent/output.rb', line 180

def initialize
  super
  @next_flush_time = 0
  @last_retry_time = 0
  @next_retry_time = 0
  @num_errors = 0
  @num_errors_lock = Mutex.new
  @secondary_limit = 8
  @emit_count = 0
end

Instance Method Details

#before_shutdownObject



411
412
413
414
415
416
417
418
# File 'lib/fluent/output.rb', line 411

def before_shutdown
  begin
    @buffer.before_shutdown(self)
  rescue
    $log.warn "before_shutdown failed", error: $!.to_s
    $log.warn_backtrace
  end
end

#calc_retry_waitObject



420
421
422
423
424
425
426
427
428
429
430
# File 'lib/fluent/output.rb', line 420

def calc_retry_wait
  # TODO retry pattern
  wait = if @disable_retry_limit || @num_errors <= @retry_limit
           @retry_wait * (2 ** (@num_errors - 1))
         else
           # secondary retry
           @retry_wait * (2 ** (@num_errors - 2 - @retry_limit))
         end
  retry_wait = wait.finite? ? wait + (rand * (wait / 4.0) - (wait / 8.0)) : wait
  @max_retry_wait ? [retry_wait, @max_retry_wait].min : retry_wait
end

#configure(conf) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/fluent/output.rb', line 209

def configure(conf)
  super

  @retry_wait = @retry_wait.to_f # converted to Float for calc_retry_wait
  @buffer = Plugin.new_buffer(@buffer_type)
  @buffer.configure(conf)

  if @buffer.respond_to?(:enable_parallel)
    if @num_threads == 1
      @buffer.enable_parallel(false)
    else
      @buffer.enable_parallel(true)
    end
  end

  @writers = (1..@num_threads).map {
    writer = OutputThread.new(self)
    writer.configure(conf)
    writer
  }

  if sconf = conf.elements.select {|e| e.name == 'secondary' }.first
    type = sconf['@type'] || conf['@type'] || sconf['type'] || conf['type']
    @secondary = Plugin.new_output(type)
    @secondary.router = router
    @secondary.configure(sconf)

    if secondary_limit = conf['secondary_limit']
      @secondary_limit = secondary_limit.to_i
      if @secondary_limit < 0
        raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'"
      end
    end

    @secondary.secondary_init(self)
  end

  Status.register(self, "queue_size") { @buffer.queue_size }
  Status.register(self, "emit_count") { @emit_count }
end

#emit(tag, es, chain, key = "") ⇒ Object



265
266
267
268
269
270
271
# File 'lib/fluent/output.rb', line 265

def emit(tag, es, chain, key="")
  @emit_count += 1
  data = format_stream(tag, es)
  if @buffer.emit(key, data, chain)
    submit_flush
  end
end

#enqueue_buffer(force = false) ⇒ Object

def write(chunk) end



293
294
295
296
297
# File 'lib/fluent/output.rb', line 293

def enqueue_buffer(force = false)
  @buffer.keys.each {|key|
    @buffer.push(key)
  }
end

#flush_secondary(secondary) ⇒ Object



442
443
444
# File 'lib/fluent/output.rb', line 442

def flush_secondary(secondary)
  @buffer.pop(secondary)
end

#force_flushObject



403
404
405
406
407
408
409
# File 'lib/fluent/output.rb', line 403

def force_flush
  @num_errors_lock.synchronize do
    @next_retry_time = Time.now.to_f - 1
  end
  enqueue_buffer(true)
  submit_flush
end

#format_stream(tag, es) ⇒ Object



279
280
281
282
283
284
285
# File 'lib/fluent/output.rb', line 279

def format_stream(tag, es)
  out = ''
  es.each {|time,record|
    out << format(tag, time, record)
  }
  out
end

#shutdownObject



259
260
261
262
263
# File 'lib/fluent/output.rb', line 259

def shutdown
  @writers.each {|writer| writer.shutdown }
  @secondary.shutdown if @secondary
  @buffer.shutdown
end

#startObject



250
251
252
253
254
255
256
257
# File 'lib/fluent/output.rb', line 250

def start
  @next_flush_time = Time.now.to_f + @flush_interval
  @buffer.start
  @secondary.start if @secondary
  @writers.each {|writer| writer.start }
  @writer_current_position = 0
  @writers_size = @writers.size
end

#submit_flushObject



273
274
275
276
277
# File 'lib/fluent/output.rb', line 273

def submit_flush
  # Without locks: it is rough but enough to select "next" writer selection
  @writer_current_position = (@writer_current_position + 1) % @writers_size
  @writers[@writer_current_position].submit_flush
end

#try_flushObject



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/fluent/output.rb', line 299

def try_flush
  time = Time.now.to_f

  empty = @buffer.queue_size == 0
  if empty && @next_flush_time < (now = Time.now.to_f)
    @buffer.synchronize do
      if @next_flush_time < now
        enqueue_buffer
        @next_flush_time = now + @flush_interval
        empty = @buffer.queue_size == 0
      end
    end
  end
  if empty
    return time + @try_flush_interval
  end

  begin
    retrying = !@num_errors.zero?

    if retrying
      @num_errors_lock.synchronize do
        if retrying = !@num_errors.zero? # re-check in synchronize
          if @next_retry_time >= time
            # allow retrying for only one thread
            return time + @try_flush_interval
          end
          # assume next retry failes and
          # clear them if when it succeeds
          @last_retry_time = time
          @num_errors += 1
          @next_retry_time += calc_retry_wait
        end
      end
    end

    if @secondary && !@disable_retry_limit && @num_errors > @retry_limit
      has_next = flush_secondary(@secondary)
    else
      has_next = @buffer.pop(self)
    end

    # success
    if retrying
      @num_errors = 0
      # Note: don't notify to other threads to prevent
      #       burst to recovered server
      $log.warn "retry succeeded.", plugin_id: plugin_id
    end

    if has_next
      return Time.now.to_f + @queued_chunk_flush_interval
    else
      return time + @try_flush_interval
    end

  rescue => e
    if retrying
      error_count = @num_errors
    else
      # first error
      error_count = 0
      @num_errors_lock.synchronize do
        if @num_errors.zero?
          @last_retry_time = time
          @num_errors += 1
          @next_retry_time = time + calc_retry_wait
        end
      end
    end

    if @disable_retry_limit || error_count < @retry_limit
      $log.warn "temporarily failed to flush the buffer.", next_retry: Time.at(@next_retry_time), error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
      $log.warn_backtrace e.backtrace

    elsif @secondary
      if error_count == @retry_limit
        $log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
        $log.warn "retry count exceededs limit. falling back to secondary output."
        $log.warn_backtrace e.backtrace
        retry  # retry immediately
      elsif error_count <= @retry_limit + @secondary_limit
        $log.warn "failed to flush the buffer, next retry will be with secondary output.", next_retry: Time.at(@next_retry_time), error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
        $log.warn_backtrace e.backtrace
      else
        $log.warn "failed to flush the buffer.", error_class: e.class, error: e.to_s, plugin_id: plugin_id
        $log.warn "secondary retry count exceededs limit."
        $log.warn_backtrace e.backtrace
        write_abort
        @num_errors = 0
      end

    else
      $log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
      $log.warn "retry count exceededs limit."
      $log.warn_backtrace e.backtrace
      write_abort
      @num_errors = 0
    end

    return @next_retry_time
  end
end

#write_abortObject



432
433
434
435
436
437
438
439
440
# File 'lib/fluent/output.rb', line 432

def write_abort
  $log.error "throwing away old logs."
  begin
    @buffer.clear!
  rescue
    $log.error "unexpected error while aborting", error: $!.to_s
    $log.error_backtrace
  end
end