Class: Fluent::Plugin::RecordDemuxOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::RecordDemuxOutput
- Defined in:
- lib/fluent/plugin/out_record_demux.rb
Constant Summary collapse
- NAME =
'record_demux'
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin/out_record_demux.rb', line 51 def configure(conf) super return unless @tag.nil? raise Fluent::ConfigError, "#{NAME}: `tag` must be specified" end |
#multi_workers_ready? ⇒ Boolean
59 60 61 |
# File 'lib/fluent/plugin/out_record_demux.rb', line 59 def multi_workers_ready? true end |
#process(_events_tag, events) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_record_demux.rb', line 63 def process(_events_tag, events) demux_events = Fluent::EventStream.new events.each do |time, record| record_keys = record.keys - remove_keys record_shared_keys = record_keys.intersection(shared_keys) record.slice(*record_shared_keys) record_demux_keys = record_keys - record_shared_keys if !demux_keys || demux_keys.empty? record_demux_keys.each do |key| next unless record.key?(key) new_record = format(time, key, record[key], shared) demux_events.add(time, new_record) end end router.emit_stream(tag, demux_events) end |