Class: Fluent::IncrementalOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::IncrementalOutput
- Defined in:
- lib/fluent/plugin/out_incremental.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
-
#initialize ⇒ IncrementalOutput
constructor
A new instance of IncrementalOutput.
Constructor Details
#initialize ⇒ IncrementalOutput
Returns a new instance of IncrementalOutput.
11 12 13 14 |
# File 'lib/fluent/plugin/out_incremental.rb', line 11 def initialize Encoding.default_internal = "UTF-8" super end |
Instance Method Details
#configure(conf) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_incremental.rb', line 16 def configure(conf) super if @unit.nil? || @unit.empty? raise ConfigError, "incremental configure requires 'unit'" else if @unit != "hour" and @unit != "day" and @unit != "month" and @unit != "year" and @unit != "min" raise ConfigError, "incremental configure unit allow year or month or day or hour or min" end end if @incremental_file_path.nil? || @incremental_file_path.empty? raise ConfigError, "incremental configure requires 'incremental_file_path'" end if @add_tag_prefix.nil? || @add_tag_prefix.empty? raise ConfigError, "incremental configure requires 'add_tag_prefix'" end if @remove_tag_prefix.nil? || @remove_tag_prefix.empty? raise ConfigError, "incremental configure requires 'remove_tag_prefix'" end @time_format = "%Y-%m-%d %H:%M:%S" = "last_updated" end |
#emit(tag, es, chain) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/out_incremental.rb', line 38 def emit(tag, es, chain) filepath = @incremental_file_path + "." + tag result = get_result(filepath) es.each do | time, record| record.each {|key,value| next if (value =~ /^[+-]?\d+$/) == nil unless @name_key_pattern.nil? next if key !~ /#{@name_key_pattern}/ end result[key] = (result[key].nil? ? 0 : result[key]) + value.to_i } end result[] = Time.now.strftime(@time_format) write_file(result,filepath) result.delete() Fluent::Engine.emit(tag.gsub(@remove_tag_prefix,@add_tag_prefix), Fluent::Engine.now, result) end |