Class: Fluent::Plugin::RecordDemuxPickerOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_record_demux_picker.rb

Defined Under Namespace

Classes: KeyMapper

Constant Summary collapse

NAME =
'record_demux_picker'
DEMUX_KEY_NORMALIZE_KEY_NAME =
'key'
DEMUX_KEY_NORMALIZE_VALUE_NAME =
'value'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 51

def configure(conf)
  super

  @demux_keys_mappers = @demux_keys.map { |key, target| KeyMapper.new(key, target) }
  @shared_keys_mappers = @shared_keys.map { |key, target| KeyMapper.new(key, target) }

  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 60

def multi_workers_ready?
  true
end

#process(_events_tag, events) ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 64

def process(_events_tag, events)
  demux_events = MultiEventStream.new
  events.each do |time, event|
    new_events = process_event(time, event)
    new_events.each { |new_event| demux_events.add(time, new_event) }
  end
  router.emit_stream(tag, demux_events)
end

#process_event(time, event) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/out_record_demux_picker.rb', line 73

def process_event(time, event)
  shared_event = extract_shared_event(event)

  @demux_keys_mappers.map do |mapper|
    value = mapper.accessor.call(event)
    shared_event
      .merge(format_demux_key(mapper.target, value))
      .merge(format_time(time))
  rescue StandardError => e
    log.warn "#{NAME} : failure while processing event : #{e}"
    nil
  end.compact
end