Class: Fluent::Plugin::TimeParserOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_time_parser.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


25
26
27
28
29
# File 'lib/fluent/plugin/out_time_parser.rb', line 25

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
end

#filter_record(tag, time, record) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_time_parser.rb', line 48

def filter_record(tag, time, record)
  begin
    record_time = DateTime.parse(record[key])

    if time_zone && time_zone != ""
      tz = TZInfo::Timezone.get(time_zone)

      period = tz.period_for_utc(record_time)
      rational_offset = period.utc_total_offset_rational

      record_time = tz.utc_to_local(record_time).new_offset(rational_offset) -
                    period.utc_total_offset_rational

    end
    date = record_time.to_date.to_s
    hour = record_time.hour.to_s

    record[parsed_time_tag] = record_time.to_s
    record[parsed_date_tag] = date
    record[parsed_hour_tag] = hour

  rescue ArgumentError => error
    log.warn("out_extract_query_params: #{error.message}")
  rescue TZInfo::InvalidTimezoneIdentifier
    log.warn("Timezone Not Valid, please refer to http://tzinfo.rubyforge.org/doc/classes/TZInfo/Timezone.html for valid timezones")
  end
end

#shutdownObject



35
36
37
# File 'lib/fluent/plugin/out_time_parser.rb', line 35

def shutdown
  super
end

#startObject



31
32
33
# File 'lib/fluent/plugin/out_time_parser.rb', line 31

def start
  super
end

#write(chunk) ⇒ Object



39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/out_time_parser.rb', line 39

def write(chunk)
  tag = extract_placeholders(@tag, chunk.)
  chunk.msgpack_each {|time,record|
    t = tag.dup
    filter_record(t, time, record)
    router.emit(t, time, record)
  }
end