Class: Fluent::SelectOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_select.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



11
12
13
14
15
16
17
18
19
20
# File 'lib/fluent/plugin/out_select.rb', line 11

def configure(conf)
  super
  if @add_prefix
    @mode = "add_prefix"
  elsif @tag
    @mode = "tag"
  else
    raise ConfigError, "Either add_prefix or tag is required "
  end
end

#do_select(tag, es) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_select.rb', line 39

def do_select(tag, es)
  output_es = MultiEventStream.new
  es.each {|time, record|
    timeout_block(tag, time, record){
      if eval(@select)
        output_es.add(time, record)
      else
        $log.trace {"filtered: #{Time.at(time)} #{tag} #{record.inspect}"}
      end
    }
  }
  output_es
end

#emit(tag, es, chain) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/fluent/plugin/out_select.rb', line 22

def emit(tag, es, chain)
  begin
    output_es = do_select(tag, es)
    if @mode == "add_prefix"
      Fluent::Engine::emit_stream(@add_prefix + "." + tag, output_es)
    else
      Fluent::Engine::emit_stream(@tag, output_es)
    end
    chain.next
    output_es #for test
  rescue SyntaxError => e
    chain.next
    $log.error "Select command is syntax error: #{@select}"
    e #for test
  end
end

#timeout_block(tag, time, record) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/out_select.rb', line 53

def timeout_block(tag, time, record)
  begin
    Timeout.timeout(@timeout){
      yield
    }
  rescue Timeout::Error
    $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
  end
end