Class: Fluent::StdinInput
- Inherits:
-
Input
- Object
- Input
- Fluent::StdinInput
- Defined in:
- lib/fluent/plugin/in_stdin.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit_event(msg) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/fluent/plugin/in_stdin.rb', line 26 def configure(conf) super @parser = Plugin.new_parser(@format) @parser.configure(conf) end |
#emit_event(msg) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/fluent/plugin/in_stdin.rb', line 70 def emit_event(msg) @parser.parse(msg) { |time, record| unless time && record log.warn "pattern not match: #{msg.inspect}" return end router.emit(@tag, time, record) } rescue => e log.error msg.dump, :error => e, :error_class => e.class log.error_backtrace end |
#run ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/in_stdin.rb', line 42 def run while true begin @buffer << $stdin.sysread(4000) pos = 0 while i = @buffer.index(@delimiter, pos) msg = @buffer[pos...i] emit_event(msg) pos = i + @delimiter.length end @buffer.slice!(0, pos) if pos > 0 rescue IOError, EOFError => e # ignore above exceptions because can't re-open stdin automatically break rescue => e log.error "unexpected error", :error=> e.to_s log.error_backtrace break end end if @stop_at_finished Fluent::Engine.flush! sleep 1 # avoid 'process died within 1 second. exit.' log Fluent::Engine.stop end end |
#shutdown ⇒ Object
38 39 40 |
# File 'lib/fluent/plugin/in_stdin.rb', line 38 def shutdown @thread.join end |
#start ⇒ Object
33 34 35 36 |
# File 'lib/fluent/plugin/in_stdin.rb', line 33 def start @buffer = "".force_encoding('ASCII-8BIT') @thread = Thread.new(&method(:run)) end |