Module: Fluent::PluginHelper::Inject

Defined in:
lib/fluent/plugin_helper/inject.rb

Defined Under Namespace

Modules: InjectParams

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(mod) ⇒ Object



77
78
79
# File 'lib/fluent/plugin_helper/inject.rb', line 77

def self.included(mod)
  mod.include InjectParams
end

Instance Method Details

#configure(conf) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin_helper/inject.rb', line 91

def configure(conf)
  conf.elements('inject').each do |e|
    if e.has_key?('utc') && Fluent::Config.bool_value(e['utc'])
      e['localtime'] = 'false'
    end
  end

  super

  if @inject_config
    @_inject_hostname_key = @inject_config.hostname_key
    if @_inject_hostname_key
      @_inject_hostname =  @inject_config.hostname
      unless @_inject_hostname
        @_inject_hostname = Socket.gethostname
        log.info "using hostname for specified field", host_key: @_inject_hostname_key, host_name: @_inject_hostname
      end
    end
    @_inject_tag_key = @inject_config.tag_key
    @_inject_time_key = @inject_config.time_key
    if @_inject_time_key
      @_inject_time_formatter = case @inject_config.time_type
                                when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value
                                when :unixtime then ->(time){ time.to_i }
                                else
                                  Fluent::TimeFormatter.new(@inject_config.time_format, @inject_config.localtime, @inject_config.timezone)
                                end
    end

    @_inject_enabled = @_inject_hostname_key || @_inject_tag_key || @_inject_time_key
  end
end

#initializeObject



81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin_helper/inject.rb', line 81

def initialize
  super
  @_inject_enabled = false
  @_inject_hostname_key = nil
  @_inject_hostname = nil
  @_inject_tag_key = nil
  @_inject_time_key = nil
  @_inject_time_formatter = nil
end

#inject_values_to_event_stream(tag, es) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin_helper/inject.rb', line 41

def inject_values_to_event_stream(tag, es)
  return es unless @_inject_enabled

  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    r = record.dup
    if @_inject_hostname_key
      r[@_inject_hostname_key] = @_inject_hostname
    end
    if @_inject_tag_key
      r[@_inject_tag_key] = tag
    end
    if @_inject_time_key
      r[@_inject_time_key] = @_inject_time_formatter.call(time)
    end
    new_es.add(time, r)
  end

  new_es
end

#inject_values_to_record(tag, time, record) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin_helper/inject.rb', line 24

def inject_values_to_record(tag, time, record)
  return record unless @_inject_enabled

  r = record.dup
  if @_inject_hostname_key
    r[@_inject_hostname_key] = @_inject_hostname
  end
  if @_inject_tag_key
    r[@_inject_tag_key] = tag
  end
  if @_inject_time_key
    r[@_inject_time_key] = @_inject_time_formatter.call(time)
  end

  r
end