Class: Fluent::PluginHelper::Server::EventHandler::TLSServer

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TLSServer

It can’t use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.

Raises:

  • (ArgumentError)


662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
# File 'lib/fluent/plugin_helper/server.rb', line 662

def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)
  @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
  @_handler_socket.sync_close = true
  @_handler_write_buffer = ''.force_encoding('ascii-8bit')
  @_handler_accepted = false
  super(@_handler_socket)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



659
660
661
# File 'lib/fluent/plugin_helper/server.rb', line 659

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



658
659
660
# File 'lib/fluent/plugin_helper/server.rb', line 658

def closing
  @closing
end

Instance Method Details

#closeObject



806
807
808
809
810
811
812
813
# File 'lib/fluent/plugin_helper/server.rb', line 806

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



690
691
692
693
694
695
696
697
698
699
700
# File 'lib/fluent/plugin_helper/server.rb', line 690

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



744
745
746
# File 'lib/fluent/plugin_helper/server.rb', line 744

def on_connect
  try_tls_accept
end

#on_read_with_connection(data) ⇒ Object



797
798
799
800
801
802
803
804
# File 'lib/fluent/plugin_helper/server.rb', line 797

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



788
789
790
791
792
793
794
795
# File 'lib/fluent/plugin_helper/server.rb', line 788

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_readableObject



748
749
750
751
752
753
754
755
756
757
# File 'lib/fluent/plugin_helper/server.rb', line 748

def on_readable
  if try_tls_accept
    super
  end
rescue IO::WaitReadable, IO::WaitWritable
  # ignore and return with doing nothing
rescue OpenSSL::SSL::SSLError => e
  @log.warn "close socket due to unexpected ssl error: #{e}"
  close rescue nil
end

#on_writableObject



759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
# File 'lib/fluent/plugin_helper/server.rb', line 759

def on_writable
  begin
    @mutex.synchronize do
      # Consider write_nonblock with {exception: false} when IO::WaitWritable error happens frequently.
      written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer)
      @_handler_write_buffer.slice!(0, written_bytes)
    end

    # No need to call `super` in a synchronized context because TLSServer doesn't use the inner buffer(::IO::Buffer) of Coolio::IO.
    # Instead of using Coolio::IO's inner buffer, TLSServer has own buffer(`@_handler_write_buffer`). See also TLSServer#write.
    # Actually, the only reason calling `super` here is call Coolio::IO#disable_write_watcher.
    # If `super` is called in a synchronized context, it could cause a mutex recursive locking since Coolio::IO#on_write_complete
    # eventually calls TLSServer#close which try to get a lock.
    super

    close if @close_after_write_complete
  rescue IO::WaitWritable, IO::WaitReadable
    return
  rescue Errno::EINTR
    return
  rescue SystemCallError, IOError, SocketError
    # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others.
    close rescue nil
    return
  rescue OpenSSL::SSL::SSLError => e
    @log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e
  end
end

#to_ioObject



686
687
688
# File 'lib/fluent/plugin_helper/server.rb', line 686

def to_io
  @_handler_socket.to_io
end

#try_tls_acceptObject



710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
# File 'lib/fluent/plugin_helper/server.rb', line 710

def try_tls_accept
  return true if @_handler_accepted

  begin
    result = @_handler_socket.accept_nonblock(exception: false) # this method call actually try to do handshake via TLS
    if result == :wait_readable || result == :wait_writable
      # retry accept_nonblock: there aren't enough data in underlying socket buffer
    else
      @_handler_accepted = true

      @callback_connection = TLSCallbackSocket.new(self)
      @connect_callback.call(@callback_connection)
      unless @data_callback
        raise "connection callback must call #data to set data callback"
      end

      return true
    end
  rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    @log.trace "unexpected error before accepting TLS connection",
               addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  rescue OpenSSL::SSL::SSLError => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    # Use same log level as on_readable
    @log.warn "unexpected error before accepting TLS connection by OpenSSL",
              addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  end

  false
end

#write(data) ⇒ Object



702
703
704
705
706
707
708
# File 'lib/fluent/plugin_helper/server.rb', line 702

def write(data)
  @mutex.synchronize do
    @_handler_write_buffer << data
    schedule_write
    data.bytesize
  end
end