Class: Fluent::TaggedCopyOutput
- Inherits:
-
CopyOutput
- Object
- CopyOutput
- Fluent::TaggedCopyOutput
- Defined in:
- lib/fluent/plugin/out_tagged_copy.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
Override to handle filter:tag options.
-
#emit(tag, es, chain) ⇒ Object
Override to use TaggedOutputChain.
-
#initialize ⇒ TaggedCopyOutput
constructor
A new instance of TaggedCopyOutput.
Constructor Details
#initialize ⇒ TaggedCopyOutput
Returns a new instance of TaggedCopyOutput.
7 8 9 10 |
# File 'lib/fluent/plugin/out_tagged_copy.rb', line 7 def initialize super @tag_procs = [] end |
Instance Method Details
#configure(conf) ⇒ Object
Override to handle filter:tag options
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/fluent/plugin/out_tagged_copy.rb', line 13 def configure(conf) conf.elements.select {|e| e.name == 'store' }.each {|e| type = e['type'] unless type raise ConfigError, "Missing 'type' parameter on <store> directive" end log.debug "adding store type=#{type.dump}" f = e.elements.select {|i| i.name == 'filter'}.first || {} tag_proc = generate_tag_proc(f['tag'], f['add_tag_prefix'], f['remove_tag_prefix']) @tag_procs << tag_proc output = Plugin.new_output(type) output.configure(e) @outputs << output } end |
#emit(tag, es, chain) ⇒ Object
Override to use TaggedOutputChain
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/out_tagged_copy.rb', line 34 def emit(tag, es, chain) unless es.repeatable? m = MultiEventStream.new es.each {|time,record| m.add(time, record) } es = m end if @deep_copy chain = TaggedCopyOutputChain.new(@outputs, @tag_procs, tag, es, chain) else chain = TaggedOutputChain.new(@outputs, @tag_procs, tag, es, chain) end chain.next end |