Class: Fluent::ForwardInput::Handler
- Inherits:
-
Coolio::Socket
- Object
- Coolio::Socket
- Fluent::ForwardInput::Handler
- Defined in:
- lib/fluent/plugin/in_forward.rb
Constant Summary collapse
- PEERADDR_FAILED =
["?", "?", "name resolusion failed", "?"]
Instance Method Summary collapse
-
#initialize(io, linger_timeout, log, on_message) ⇒ Handler
constructor
A new instance of Handler.
- #on_close ⇒ Object
- #on_connect ⇒ Object
- #on_read(data) ⇒ Object
- #on_read_json(data) ⇒ Object
- #on_read_msgpack(data) ⇒ Object
- #respond(option) ⇒ Object
Constructor Details
#initialize(io, linger_timeout, log, on_message) ⇒ Handler
Returns a new instance of Handler.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/fluent/plugin/in_forward.rb', line 239 def initialize(io, linger_timeout, log, ) super(io) @peeraddr = nil if io.is_a?(TCPSocket) # for unix domain socket support in the future @peeraddr = (io.peeraddr rescue PEERADDR_FAILED) opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; } io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) end @chunk_counter = 0 = @log = log @log.trace { begin remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue => e remote_port = nil remote_addr = nil end "accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}" } end |
Instance Method Details
#on_close ⇒ Object
319 320 321 |
# File 'lib/fluent/plugin/in_forward.rb', line 319 def on_close @log.trace { "closed socket" } end |
#on_connect ⇒ Object
263 264 |
# File 'lib/fluent/plugin/in_forward.rb', line 263 def on_connect end |
#on_read(data) ⇒ Object
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/fluent/plugin/in_forward.rb', line 266 def on_read(data) first = data[0] if first == '{' || first == '[' m = method(:on_read_json) @serializer = :to_json.to_proc @y = Yajl::Parser.new @y.on_parse_complete = lambda { |obj| option = .call(obj, @chunk_counter, @peeraddr) respond option if option @chunk_counter = 0 } else m = method(:on_read_msgpack) @serializer = :to_msgpack.to_proc @u = Fluent::Engine.msgpack_factory.unpacker end (class << self; self; end).module_eval do define_method(:on_read, m) end m.call(data) end |
#on_read_json(data) ⇒ Object
289 290 291 292 293 294 295 296 |
# File 'lib/fluent/plugin/in_forward.rb', line 289 def on_read_json(data) @chunk_counter += data.bytesize @y << data rescue => e @log.error "forward error", error: e, error_class: e.class @log.error_backtrace close end |
#on_read_msgpack(data) ⇒ Object
298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/fluent/plugin/in_forward.rb', line 298 def on_read_msgpack(data) @chunk_counter += data.bytesize @u.feed_each(data) do |obj| option = .call(obj, @chunk_counter, @peeraddr) respond option if option @chunk_counter = 0 end rescue => e @log.error "forward error", error: e, error_class: e.class @log.error_backtrace close end |
#respond(option) ⇒ Object
311 312 313 314 315 316 317 |
# File 'lib/fluent/plugin/in_forward.rb', line 311 def respond(option) if option && option['chunk'] res = { 'ack' => option['chunk'] } write @serializer.call(res) @log.trace { "sent response to fluent socket" } end end |