Class: Fluent::Plugin::RecordDemuxOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::RecordDemuxOutput
- Defined in:
- lib/fluent/plugin/out_record_demux.rb
Constant Summary collapse
- NAME =
'record_demux'
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/out_record_demux.rb', line 51 def configure(conf) super raise Fluent::ConfigError, "#{NAME}: `tag` must be specified" if tag.nil? || tag.empty? if demux_keys.nil? && shared_keys.nil? raise Fluent::ConfigError, 'specify demux_keys or shared_keys' end true end |
#multi_workers_ready? ⇒ Boolean
63 64 65 |
# File 'lib/fluent/plugin/out_record_demux.rb', line 63 def multi_workers_ready? true end |
#process(_events_tag, events) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/out_record_demux.rb', line 67 def process(_events_tag, events) demux_events = MultiEventStream.new events.each do |time, record| record_keys = record.keys - remove_keys record_shared_keys = if shared_keys.nil? record_keys - demux_keys else record_keys.intersection(shared_keys) end record_shared = record.slice(*record_shared_keys) record_demux_keys = if demux_keys.nil? record_keys - record_shared_keys else record_keys.intersection(demux_keys) end record_demux_keys.each do |key| next unless record.key?(key) new_record = format(time, key, record[key], record_shared) demux_events.add(time, new_record) end end router.emit_stream(tag, demux_events) end |