Class: Fluent::ForwardInput::Handler
- Inherits:
-
Coolio::Socket
- Object
- Coolio::Socket
- Fluent::ForwardInput::Handler
- Defined in:
- lib/fluent/plugin/in_forward.rb
Constant Summary collapse
- PEERADDR_FAILED =
["?", "?", "name resolusion failed", "?"]
Instance Attribute Summary collapse
-
#protocol ⇒ Object
readonly
Returns the value of attribute protocol.
-
#remote_addr ⇒ Object
readonly
Returns the value of attribute remote_addr.
-
#remote_host ⇒ Object
readonly
Returns the value of attribute remote_host.
-
#remote_port ⇒ Object
readonly
Returns the value of attribute remote_port.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(io, linger_timeout, log, on_connect_callback) ⇒ Handler
constructor
A new instance of Handler.
- #on_connect ⇒ Object
-
#on_data(&callback) ⇒ Object
API to register callback for data arrival.
- #on_read(data) ⇒ Object
- #on_write_complete ⇒ Object
Constructor Details
#initialize(io, linger_timeout, log, on_connect_callback) ⇒ Handler
Returns a new instance of Handler.
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 |
# File 'lib/fluent/plugin/in_forward.rb', line 513 def initialize(io, linger_timeout, log, on_connect_callback) super(io) @peeraddr = nil if io.is_a?(TCPSocket) # for unix domain socket support in the future @peeraddr = (io.peeraddr rescue PEERADDR_FAILED) opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; } io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) end ### TODO: disabling name rev resolv proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED ) if addr == '?' port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil end @protocol = proto @remote_port = port @remote_addr = addr @remote_host = host @writing = false @closing = false @mutex = Mutex.new @chunk_counter = 0 @on_connect_callback = on_connect_callback @log = log @log.trace { begin remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue remote_port = nil remote_addr = nil end [ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ] } end |
Instance Attribute Details
#protocol ⇒ Object (readonly)
Returns the value of attribute protocol.
509 510 511 |
# File 'lib/fluent/plugin/in_forward.rb', line 509 def protocol @protocol end |
#remote_addr ⇒ Object (readonly)
Returns the value of attribute remote_addr.
509 510 511 |
# File 'lib/fluent/plugin/in_forward.rb', line 509 def remote_addr @remote_addr end |
#remote_host ⇒ Object (readonly)
Returns the value of attribute remote_host.
509 510 511 |
# File 'lib/fluent/plugin/in_forward.rb', line 509 def remote_host @remote_host end |
#remote_port ⇒ Object (readonly)
Returns the value of attribute remote_port.
509 510 511 |
# File 'lib/fluent/plugin/in_forward.rb', line 509 def remote_port @remote_port end |
Instance Method Details
#close ⇒ Object
577 578 579 580 581 582 583 584 585 |
# File 'lib/fluent/plugin/in_forward.rb', line 577 def close writing = @mutex.synchronize { @closing = true @writing } unless writing super end end |
#on_connect ⇒ Object
550 551 552 |
# File 'lib/fluent/plugin/in_forward.rb', line 550 def on_connect @on_connect_callback.call(self) end |
#on_data(&callback) ⇒ Object
API to register callback for data arrival
555 556 557 |
# File 'lib/fluent/plugin/in_forward.rb', line 555 def on_data(&callback) @on_read_callback = callback end |
#on_read(data) ⇒ Object
559 560 561 562 563 564 565 |
# File 'lib/fluent/plugin/in_forward.rb', line 559 def on_read(data) @on_read_callback.call(data) rescue => e @log.error "unexpected error on reading data from client", address: @remote_addr, error: e @log.error_backtrace close end |
#on_write_complete ⇒ Object
567 568 569 570 571 572 573 574 575 |
# File 'lib/fluent/plugin/in_forward.rb', line 567 def on_write_complete closing = @mutex.synchronize { @writing = false @closing } if closing close end end |