Class: Fluent::Plugin::TimeParserOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::TimeParserOutput
- Defined in:
- lib/fluent/plugin/out_time_parser.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #filter_record(tag, time, record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 29 |
# File 'lib/fluent/plugin/out_time_parser.rb', line 25 def configure(conf) compat_parameters_convert(conf, :buffer) super raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag end |
#filter_record(tag, time, record) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/fluent/plugin/out_time_parser.rb', line 48 def filter_record(tag, time, record) begin record_time = DateTime.parse(record[key]) if time_zone && time_zone != "" tz = TZInfo::Timezone.get(time_zone) period = tz.period_for_utc(record_time) rational_offset = period.utc_total_offset_rational record_time = tz.utc_to_local(record_time).new_offset(rational_offset) - period.utc_total_offset_rational end date = record_time.to_date.to_s hour = record_time.hour.to_s record[parsed_time_tag] = record_time.to_s record[parsed_date_tag] = date record[parsed_hour_tag] = hour rescue ArgumentError => error log.warn("out_extract_query_params: #{error.}") rescue TZInfo::InvalidTimezoneIdentifier log.warn("Timezone Not Valid, please refer to http://tzinfo.rubyforge.org/doc/classes/TZInfo/Timezone.html for valid timezones") end end |
#shutdown ⇒ Object
35 36 37 |
# File 'lib/fluent/plugin/out_time_parser.rb', line 35 def shutdown super end |
#start ⇒ Object
31 32 33 |
# File 'lib/fluent/plugin/out_time_parser.rb', line 31 def start super end |
#write(chunk) ⇒ Object
39 40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/out_time_parser.rb', line 39 def write(chunk) tag = extract_placeholders(@tag, chunk.) chunk.msgpack_each {|time,record| t = tag.dup filter_record(t, time, record) router.emit(t, time, record) } end |