Class: Fluent::Plugin::RecordDemuxPickerOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::RecordDemuxPickerOutput
- Defined in:
- lib/fluent/plugin/out_record_demux_picker.rb
Defined Under Namespace
Classes: KeyMapper
Constant Summary collapse
- NAME =
'record_demux_picker'- DEMUX_KEY_NORMALIZE_KEY_NAME =
'key'- DEMUX_KEY_NORMALIZE_VALUE_NAME =
'value'
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #process(_events_tag, events) ⇒ Object
- #process_event(time, event) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 51 def configure(conf) super @demux_keys_mappers = @demux_keys.map { |key, target| KeyMapper.new(key, target) } @shared_keys_mappers = @shared_keys.map { |key, target| KeyMapper.new(key, target) } true end |
#multi_workers_ready? ⇒ Boolean
60 61 62 |
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 60 def multi_workers_ready? true end |
#process(_events_tag, events) ⇒ Object
64 65 66 67 68 69 70 71 |
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 64 def process(_events_tag, events) demux_events = MultiEventStream.new events.each do |time, event| new_events = process_event(time, event) new_events.each { |new_event| demux_events.add(time, new_event) } end router.emit_stream(tag, demux_events) end |
#process_event(time, event) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 73 def process_event(time, event) shared_event = extract_shared_event(event) @demux_keys_mappers.map do |mapper| value = mapper.accessor.call(event) shared_event .merge(format_demux_key(mapper.target, value)) .merge(format_time(time)) rescue StandardError => e log.warn "#{NAME} : failure while processing event : #{e}" nil end.compact end |