Class: Fluent::TimeSlicedOutput

Inherits:
BufferedOutput show all
Defined in:
lib/fluent/output.rb

Direct Known Subclasses

ExecOutput, FileOutput

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Output

#router

Attributes included from PluginLoggerMixin

#log

Instance Method Summary collapse

Methods inherited from BufferedOutput

#before_shutdown, #calc_retry_wait, #flush_secondary, #force_flush, #format_stream, #shutdown, #start, #submit_flush, #try_flush, #write_abort

Methods inherited from Output

#secondary_init, #shutdown, #start

Methods included from PluginLoggerMixin

included

Methods included from PluginId

#plugin_id

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeTimeSlicedOutput

Returns a new instance of TimeSlicedOutput.



488
489
490
491
492
# File 'lib/fluent/output.rb', line 488

def initialize
  super
  @localtime = true
  #@ignore_old = false   # TODO
end

Instance Attribute Details

#localtimeObject

Returns the value of attribute localtime.



504
505
506
# File 'lib/fluent/output.rb', line 504

def localtime
  @localtime
end

#time_slicerObject (readonly)

for test



505
506
507
# File 'lib/fluent/output.rb', line 505

def time_slicer
  @time_slicer
end

Instance Method Details

#configure(conf) ⇒ Object



507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
# File 'lib/fluent/output.rb', line 507

def configure(conf)
  super

  if conf['utc']
    @localtime = false
  elsif conf['localtime']
    @localtime = true
  end

  if conf['timezone']
    @timezone = conf['timezone']
    Fluent::Timezone.validate!(@timezone)
  end

  if @timezone
    @time_slicer = Timezone.formatter(@timezone, @time_slice_format)
  elsif @localtime
    @time_slicer = Proc.new {|time|
      Time.at(time).strftime(@time_slice_format)
    }
  else
    @time_slicer = Proc.new {|time|
      Time.at(time).utc.strftime(@time_slice_format)
    }
  end

  @time_slice_cache_interval = time_slice_cache_interval
  @before_tc = nil
  @before_key = nil

  if @flush_interval
    if conf['time_slice_wait']
      $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}"
    end
    @enqueue_buffer_proc = Proc.new do
      @buffer.keys.each {|key|
        @buffer.push(key)
      }
    end

  else
    @flush_interval = [60, @time_slice_cache_interval].min
    @enqueue_buffer_proc = Proc.new do
      nowslice = @time_slicer.call(Engine.now - @time_slice_wait)
      @buffer.keys.each {|key|
        if key < nowslice
          @buffer.push(key)
        end
      }
    end
  end
end

#emit(tag, es, chain) ⇒ Object



560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
# File 'lib/fluent/output.rb', line 560

def emit(tag, es, chain)
  @emit_count += 1
  formatted_data = {}
  es.each {|time,record|
    begin
      tc = time / @time_slice_cache_interval
      if @before_tc == tc
        key = @before_key
      else
        @before_tc = tc
        key = @time_slicer.call(time)
        @before_key = key
      end
    rescue => e
      @router.emit_error_event(tag, Engine.now, {'time' => time, 'record' => record}, e)
      next
    end

    formatted_data[key] ||= ''
    formatted_data[key] << format(tag, time, record)
  }
  formatted_data.each { |key, data|
    if @buffer.emit(key, data, chain)
      submit_flush
    end
  }
end

#enqueue_buffer(force = false) ⇒ Object



588
589
590
591
592
593
594
595
596
# File 'lib/fluent/output.rb', line 588

def enqueue_buffer(force = false)
  if force
    @buffer.keys.each {|key|
      @buffer.push(key)
    }
  else
    @enqueue_buffer_proc.call
  end
end