Class: Fluent::JsonExpanderOutput

Inherits:
MultiOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_json_expander.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#outputsObject (readonly)

Returns the value of attribute outputs.



11
12
13
# File 'lib/fluent/plugin/out_json_expander.rb', line 11

def outputs
  @outputs
end

Instance Method Details

#configure(conf) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_json_expander.rb', line 18

def configure(conf)
  super

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
  if @add_prefix
    @added_prefix_string = @add_prefix + '.'
  end

  @outputs = []
  @mutex = Mutex.new
  @mappings = {}
  @invalid_mapping_keys = []

  templates = conf.elements.select{|e| e.name == 'template' }
  if templates.size != 1
    raise Fluent::ConfigError, "Just 1 template must be contained"
  end

  @template = templates.first
  @expand_target_keys = scan_keys(@template)
end

#emit(tag, es, chain) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/out_json_expander.rb', line 43

def emit(tag, es, chain)
  if @remove_prefix and
      ((tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
    tag = tag[@removed_length..-1]
  end
  if @add_prefix
    tag = tag.empty? ? @add_prefix : (@added_prefix_string + tag)
  end

  es.each {|time, record|
    output, new_record = *new_output(record)
    if output
      dup_es = Fluent::ArrayEventStream.new([[time, new_record]])
      null_chain = Fluent::NullOutputChain.instance
      output.emit(tag, dup_es, null_chain)
    end
  }
  chain.next
end

#shutdownObject



63
64
65
66
67
68
69
# File 'lib/fluent/plugin/out_json_expander.rb', line 63

def shutdown
  super
  @mappings.values.each do |output|
    output.shutdown
  end
  @mappings.clear
end