Class: Fluent::PluginHelper::Server::EventHandler::TLSServer

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TLSServer

It can’t use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.

Raises:

  • (ArgumentError)


684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
# File 'lib/fluent/plugin_helper/server.rb', line 684

def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)
  @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
  @_handler_socket.sync_close = true
  @_handler_write_buffer = ''.force_encoding('ascii-8bit')
  @_handler_accepted = false
  super(@_handler_socket)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



681
682
683
# File 'lib/fluent/plugin_helper/server.rb', line 681

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



680
681
682
# File 'lib/fluent/plugin_helper/server.rb', line 680

def closing
  @closing
end

Instance Method Details

#closeObject



828
829
830
831
832
833
834
835
# File 'lib/fluent/plugin_helper/server.rb', line 828

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



712
713
714
715
716
717
718
719
720
721
722
# File 'lib/fluent/plugin_helper/server.rb', line 712

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



766
767
768
# File 'lib/fluent/plugin_helper/server.rb', line 766

def on_connect
  try_tls_accept
end

#on_read_with_connection(data) ⇒ Object



819
820
821
822
823
824
825
826
# File 'lib/fluent/plugin_helper/server.rb', line 819

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



810
811
812
813
814
815
816
817
# File 'lib/fluent/plugin_helper/server.rb', line 810

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_readableObject



770
771
772
773
774
775
776
777
778
779
# File 'lib/fluent/plugin_helper/server.rb', line 770

def on_readable
  if try_tls_accept
    super
  end
rescue IO::WaitReadable, IO::WaitWritable
  # ignore and return with doing nothing
rescue OpenSSL::SSL::SSLError => e
  @log.warn "close socket due to unexpected ssl error: #{e}"
  close rescue nil
end

#on_writableObject



781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
# File 'lib/fluent/plugin_helper/server.rb', line 781

def on_writable
  begin
    @mutex.synchronize do
      # Consider write_nonblock with {exception: false} when IO::WaitWritable error happens frequently.
      written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer)
      @_handler_write_buffer.slice!(0, written_bytes)
    end

    # No need to call `super` in a synchronized context because TLSServer doesn't use the inner buffer(::IO::Buffer) of Coolio::IO.
    # Instead of using Coolio::IO's inner buffer, TLSServer has own buffer(`@_handler_write_buffer`). See also TLSServer#write.
    # Actually, the only reason calling `super` here is call Coolio::IO#disable_write_watcher.
    # If `super` is called in a synchronized context, it could cause a mutex recursive locking since Coolio::IO#on_write_complete
    # eventually calls TLSServer#close which try to get a lock.
    super

    close if @close_after_write_complete
  rescue IO::WaitWritable, IO::WaitReadable
    return
  rescue Errno::EINTR
    return
  rescue SystemCallError, IOError, SocketError
    # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others.
    close rescue nil
    return
  rescue OpenSSL::SSL::SSLError => e
    @log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e
  end
end

#to_ioObject



708
709
710
# File 'lib/fluent/plugin_helper/server.rb', line 708

def to_io
  @_handler_socket.to_io
end

#try_tls_acceptObject



732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
# File 'lib/fluent/plugin_helper/server.rb', line 732

def try_tls_accept
  return true if @_handler_accepted

  begin
    result = @_handler_socket.accept_nonblock(exception: false) # this method call actually try to do handshake via TLS
    if result == :wait_readable || result == :wait_writable
      # retry accept_nonblock: there aren't enough data in underlying socket buffer
    else
      @_handler_accepted = true

      @callback_connection = TLSCallbackSocket.new(self)
      @connect_callback.call(@callback_connection)
      unless @data_callback
        raise "connection callback must call #data to set data callback"
      end

      return true
    end
  rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    @log.trace "unexpected error before accepting TLS connection",
               addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  rescue OpenSSL::SSL::SSLError => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    # Use same log level as on_readable
    @log.warn "unexpected error before accepting TLS connection by OpenSSL",
              addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  end

  false
end

#write(data) ⇒ Object



724
725
726
727
728
729
730
# File 'lib/fluent/plugin_helper/server.rb', line 724

def write(data)
  @mutex.synchronize do
    @_handler_write_buffer << data
    schedule_write
    data.bytesize
  end
end