Class: Fluent::ForwardInput::Handler

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin/in_forward.rb

Constant Summary collapse

PEERADDR_FAILED =
["?", "?", "name resolusion failed", "?"]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, linger_timeout, log, on_connect_callback) ⇒ Handler

Returns a new instance of Handler.



513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/fluent/plugin/in_forward.rb', line 513

def initialize(io, linger_timeout, log, on_connect_callback)
  super(io)

  @peeraddr = nil
  if io.is_a?(TCPSocket) # for unix domain socket support in the future
    @peeraddr = (io.peeraddr rescue PEERADDR_FAILED)
    opt = [1, linger_timeout].pack('I!I!')  # { int l_onoff; int l_linger; }
    io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  end

  ### TODO: disabling name rev resolv
  proto, port, host, addr = ( io.peeraddr rescue PEERADDR_FAILED )
  if addr == '?'
    port, addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil
  end
  @protocol = proto
  @remote_port = port
  @remote_addr = addr
  @remote_host = host
  @writing = false
  @closing = false
  @mutex = Mutex.new

  @chunk_counter = 0
  @on_connect_callback = on_connect_callback
  @log = log
  @log.trace {
    begin
      remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername)
    rescue
      remote_port = nil
      remote_addr = nil
    end
    [ "accepted fluent socket", {address: remote_addr, port: remote_port, instance: self.object_id} ]
  }
end

Instance Attribute Details

#protocolObject (readonly)

Returns the value of attribute protocol.



509
510
511
# File 'lib/fluent/plugin/in_forward.rb', line 509

def protocol
  @protocol
end

#remote_addrObject (readonly)

Returns the value of attribute remote_addr.



509
510
511
# File 'lib/fluent/plugin/in_forward.rb', line 509

def remote_addr
  @remote_addr
end

#remote_hostObject (readonly)

Returns the value of attribute remote_host.



509
510
511
# File 'lib/fluent/plugin/in_forward.rb', line 509

def remote_host
  @remote_host
end

#remote_portObject (readonly)

Returns the value of attribute remote_port.



509
510
511
# File 'lib/fluent/plugin/in_forward.rb', line 509

def remote_port
  @remote_port
end

Instance Method Details

#closeObject



577
578
579
580
581
582
583
584
585
# File 'lib/fluent/plugin/in_forward.rb', line 577

def close
  writing = @mutex.synchronize {
    @closing = true
    @writing
  }
  unless writing
    super
  end
end

#on_connectObject



550
551
552
# File 'lib/fluent/plugin/in_forward.rb', line 550

def on_connect
  @on_connect_callback.call(self)
end

#on_data(&callback) ⇒ Object

API to register callback for data arrival



555
556
557
# File 'lib/fluent/plugin/in_forward.rb', line 555

def on_data(&callback)
  @on_read_callback = callback
end

#on_read(data) ⇒ Object



559
560
561
562
563
564
565
# File 'lib/fluent/plugin/in_forward.rb', line 559

def on_read(data)
  @on_read_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data from client", address: @remote_addr, error: e
  @log.error_backtrace
  close
end

#on_write_completeObject



567
568
569
570
571
572
573
574
575
# File 'lib/fluent/plugin/in_forward.rb', line 567

def on_write_complete
  closing = @mutex.synchronize {
    @writing = false
    @closing
  }
  if closing
    close
  end
end