Class: Fluent::PluginHelper::Server::EventHandler::TLSServer

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TLSServer

It can’t use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.

Raises:

  • (ArgumentError)


605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
# File 'lib/fluent/plugin_helper/server.rb', line 605

def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)
  @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
  @_handler_socket.sync_close = true
  @_handler_write_buffer = ''.force_encoding('ascii-8bit')
  @_handler_accepted = false
  super(@_handler_socket)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Method Details

#closeObject



726
727
728
729
730
731
732
733
# File 'lib/fluent/plugin_helper/server.rb', line 726

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



632
633
634
635
636
637
638
639
640
641
642
# File 'lib/fluent/plugin_helper/server.rb', line 632

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



676
677
678
# File 'lib/fluent/plugin_helper/server.rb', line 676

def on_connect
  try_tls_accept
end

#on_read_with_connection(data) ⇒ Object



717
718
719
720
721
722
723
724
# File 'lib/fluent/plugin_helper/server.rb', line 717

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close(true) rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



708
709
710
711
712
713
714
715
# File 'lib/fluent/plugin_helper/server.rb', line 708

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close(true) rescue nil
  raise if @under_plugin_development
end

#on_readableObject



680
681
682
683
684
685
686
# File 'lib/fluent/plugin_helper/server.rb', line 680

def on_readable
  if try_tls_accept
    super
  end
rescue IO::WaitReadable, IO::WaitWritable
  # ignore and return with doing nothing
end

#on_writableObject



688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
# File 'lib/fluent/plugin_helper/server.rb', line 688

def on_writable
  begin
    @mutex.synchronize do
      written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer)
      @_handler_write_buffer.slice!(0, written_bytes)
      super
    end
  rescue IO::WaitWritable, IO::WaitReadable
    return
  rescue Errno::EINTR
    return
  rescue SystemCallError, IOError, SocketError
    # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others.
    close rescue nil
    return
  rescue OpenSSL::SSL::SSLError => e
    @log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e
  end
end

#to_ioObject



628
629
630
# File 'lib/fluent/plugin_helper/server.rb', line 628

def to_io
  @_handler_socket.to_io
end

#try_tls_acceptObject



652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
# File 'lib/fluent/plugin_helper/server.rb', line 652

def try_tls_accept
  return true if @_handler_accepted

  begin
    @_handler_socket.accept_nonblock # this method call actually try to do handshake via TLS
    @_handler_accepted = true

    @callback_connection = TLSCallbackSocket.new(self)
    @connect_callback.call(@callback_connection)
    unless @data_callback
      raise "connection callback must call #data to set data callback"
    end
    return true

  rescue IO::WaitReadable, IO::WaitWritable
    # retry accept_nonblock: there aren't enough data in underlying socket buffer
  rescue OpenSSL::SSL::SSLError => e
    @log.trace "unexpected error before accepting TLS connection", error: e
    close rescue nil
  end

  false
end

#write(data) ⇒ Object



644
645
646
647
648
649
650
# File 'lib/fluent/plugin_helper/server.rb', line 644

def write(data)
  @mutex.synchronize do
    @_handler_write_buffer << data
    schedule_write
    data.bytesize
  end
end