Class: Fluent::SelectOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::SelectOutput
- Defined in:
- lib/fluent/plugin/out_select.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #do_select(tag, es) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #timeout_block(tag, time, record) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/fluent/plugin/out_select.rb', line 11 def configure(conf) super if @add_prefix @mode = "add_prefix" elsif @tag @mode = "tag" else raise ConfigError, "Either add_prefix or tag is required " end end |
#do_select(tag, es) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_select.rb', line 39 def do_select(tag, es) output_es = MultiEventStream.new es.each {|time, record| timeout_block(tag, time, record){ if eval(@select) output_es.add(time, record) else $log.trace {"filtered: #{Time.at(time)} #{tag} #{record.inspect}"} end } } output_es end |
#emit(tag, es, chain) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/fluent/plugin/out_select.rb', line 22 def emit(tag, es, chain) begin output_es = do_select(tag, es) if @mode == "add_prefix" Fluent::Engine::emit_stream(@add_prefix + "." + tag, output_es) else Fluent::Engine::emit_stream(@tag, output_es) end chain.next output_es #for test rescue SyntaxError => e chain.next $log.error "Select command is syntax error: #{@select}" e #for test end end |
#timeout_block(tag, time, record) ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/out_select.rb', line 53 def timeout_block(tag, time, record) begin Timeout.timeout(@timeout){ yield } rescue Timeout::Error $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"} end end |