Class: Fluent::IncrementalOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_incremental.rb

Instance Method Summary collapse

Constructor Details

#initializeIncrementalOutput

Returns a new instance of IncrementalOutput.



11
12
13
14
# File 'lib/fluent/plugin/out_incremental.rb', line 11

def initialize
  Encoding.default_internal = "UTF-8"
  super
end

Instance Method Details

#configure(conf) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/out_incremental.rb', line 16

def configure(conf)
  super
  if @unit.nil? || @unit.empty?
    raise ConfigError, "incremental configure requires 'unit'"
  else
    if @unit != "hour" and @unit != "day" and @unit != "month" and @unit != "year" and @unit != "min"
      raise ConfigError, "incremental configure unit allow year or month or day or hour or min"
    end
  end
  if @incremental_file_path.nil? || @incremental_file_path.empty?
    raise ConfigError, "incremental configure requires 'incremental_file_path'"
  end
  if @add_tag_prefix.nil? || @add_tag_prefix.empty?
    raise ConfigError, "incremental configure requires 'add_tag_prefix'"
  end
  if @remove_tag_prefix.nil? || @remove_tag_prefix.empty?
    raise ConfigError, "incremental configure requires 'remove_tag_prefix'"
  end
  @time_format = "%Y-%m-%d %H:%M:%S"
  @timestamp_key = "last_updated"
end

#emit(tag, es, chain) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_incremental.rb', line 38

def emit(tag, es, chain)
  filepath = @incremental_file_path + "." + tag
  result = get_result(filepath)
  es.each do | time, record|
    record.each {|key,value| 
      next if (value =~ /^[+-]?\d+$/) == nil
      unless @name_key_pattern.nil?
        next if key !~ /#{@name_key_pattern}/
      end
      result[key] = (result[key].nil? ? 0 : result[key]) + value.to_i 
    }
  end
  result[@timestamp_key] = Time.now.strftime(@time_format)
  write_file(result,filepath)
  result.delete(@timestamp_key)
  Fluent::Engine.emit(tag.gsub(@remove_tag_prefix,@add_tag_prefix), Fluent::Engine.now, result)
end