Class: Fluent::StdinInput
- Inherits:
-
Input
- Object
- Input
- Fluent::StdinInput
- Defined in:
- lib/fluent/plugin/in_stdin.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit_event(msg) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 29 30 |
# File 'lib/fluent/plugin/in_stdin.rb', line 25 def configure(conf) super @parser = Plugin.new_parser(@format) @parser.configure(conf) end |
#emit_event(msg) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/in_stdin.rb', line 64 def emit_event(msg) @parser.parse(msg) { |time, record| unless time && record log.warn "pattern not match: #{msg.inspect}" return end router.emit(@tag, time, record) } rescue => e log.error msg.dump, :error => e, :error_class => e.class log.error_backtrace end |
#run ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fluent/plugin/in_stdin.rb', line 41 def run while true begin @buffer << $stdin.sysread(4000) pos = 0 while i = @buffer.index(@delimiter, pos) msg = @buffer[pos...i] emit_event(msg) pos = i + @delimiter.length end @buffer.slice!(0, pos) if pos > 0 rescue IOError, EOFError => e # ignore above exceptions because can't re-open stdin automatically break rescue => e log.error "unexpected error", :error=> e.to_s log.error_backtrace break end end end |
#shutdown ⇒ Object
37 38 39 |
# File 'lib/fluent/plugin/in_stdin.rb', line 37 def shutdown @thread.join end |
#start ⇒ Object
32 33 34 35 |
# File 'lib/fluent/plugin/in_stdin.rb', line 32 def start @buffer = "".force_encoding('ASCII-8BIT') @thread = Thread.new(&method(:run)) end |