Class: Fluent::Plugin::RecordDemuxOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_record_demux.rb

Constant Summary collapse

NAME =
'record_demux'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/out_record_demux.rb', line 51

def configure(conf)
  super

  raise Fluent::ConfigError, "#{NAME}: `tag` must be specified" if tag.nil? || tag.empty?

  if demux_keys.nil? && shared_keys.nil?
    raise Fluent::ConfigError, 'specify demux_keys or shared_keys'
  end

  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/fluent/plugin/out_record_demux.rb', line 63

def multi_workers_ready?
  true
end

#process(_events_tag, events) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/out_record_demux.rb', line 67

def process(_events_tag, events)
  demux_events = MultiEventStream.new
  events.each do |time, record|
    record_keys = record.keys - remove_keys

    record_shared_keys = if shared_keys.nil?
                           record_keys - demux_keys
                         else
                           record_keys.intersection(shared_keys)
                         end
    record_shared = record.slice(*record_shared_keys)

    record_demux_keys = if demux_keys.nil?
                          record_keys - record_shared_keys
                        else
                          record_keys.intersection(demux_keys)
                        end

    record_demux_keys.each do |key|
      next unless record.key?(key)

      new_record = format(time, key, record[key], record_shared)
      demux_events.add(time, new_record)
    end
  end
  router.emit_stream(tag, demux_events)
end