Class: Fluent::SocketUtil::TcpHandler
- Inherits:
-
Coolio::Socket
- Object
- Coolio::Socket
- Fluent::SocketUtil::TcpHandler
- Defined in:
- lib/fluent/plugin/socket_util.rb
Constant Summary collapse
- PEERADDR_FAILED =
["?", "?", "name resolusion failed", "?"]
Instance Method Summary collapse
-
#initialize(io, log, delimiter, callback) ⇒ TcpHandler
constructor
A new instance of TcpHandler.
- #on_close ⇒ Object
- #on_connect ⇒ Object
- #on_read(data) ⇒ Object
Constructor Details
#initialize(io, log, delimiter, callback) ⇒ TcpHandler
Returns a new instance of TcpHandler.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/socket_util.rb', line 55 def initialize(io, log, delimiter, callback) super(io) if io.is_a?(TCPSocket) @addr = (io.peeraddr rescue PEERADDR_FAILED) opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) end @delimiter = delimiter @callback = callback @log = log @log.trace { "accepted fluent socket object_id=#{self.object_id}" } @buffer = "".force_encoding('ASCII-8BIT') end |
Instance Method Details
#on_close ⇒ Object
88 89 90 |
# File 'lib/fluent/plugin/socket_util.rb', line 88 def on_close @log.trace { "closed fluent socket object_id=#{self.object_id}" } end |
#on_connect ⇒ Object
70 71 |
# File 'lib/fluent/plugin/socket_util.rb', line 70 def on_connect end |
#on_read(data) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/fluent/plugin/socket_util.rb', line 73 def on_read(data) @buffer << data pos = 0 while i = @buffer.index(@delimiter, pos) msg = @buffer[pos...i] @callback.call(msg, @addr) pos = i + @delimiter.length end @buffer.slice!(0, pos) if pos > 0 rescue => e @log.error "unexpected error", error: e, error_class: e.class close end |