Class: Fluent::PluginHelper::Server::EventHandler::TCPServer

Inherits:
Coolio::TCPSocket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TCPServer

Returns a new instance of TCPServer.

Raises:

  • (ArgumentError)


524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
# File 'lib/fluent/plugin_helper/server.rb', line 524

def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)

  @_handler_socket = sock
  super(sock)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Method Details

#closeObject



593
594
595
596
597
598
599
600
# File 'lib/fluent/plugin_helper/server.rb', line 593

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



549
550
551
552
553
554
555
556
557
558
559
# File 'lib/fluent/plugin_helper/server.rb', line 549

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



567
568
569
570
571
572
573
# File 'lib/fluent/plugin_helper/server.rb', line 567

def on_connect
  @callback_connection = TCPCallbackSocket.new(self)
  @connect_callback.call(@callback_connection)
  unless @data_callback
    raise "connection callback must call #data to set data callback"
  end
end

#on_read_with_connection(data) ⇒ Object



584
585
586
587
588
589
590
591
# File 'lib/fluent/plugin_helper/server.rb', line 584

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close(true) rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



575
576
577
578
579
580
581
582
# File 'lib/fluent/plugin_helper/server.rb', line 575

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close(true) rescue nil
  raise if @under_plugin_development
end

#to_ioObject



545
546
547
# File 'lib/fluent/plugin_helper/server.rb', line 545

def to_io
  @_handler_socket
end

#write(data) ⇒ Object



561
562
563
564
565
# File 'lib/fluent/plugin_helper/server.rb', line 561

def write(data)
  @mutex.synchronize do
    super
  end
end