Class: Fluent::PluginHelper::Server::EventHandler::TCPServer

Inherits:
Coolio::TCPSocket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TCPServer

Returns a new instance of TCPServer.

Raises:

  • (ArgumentError)


572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
# File 'lib/fluent/plugin_helper/server.rb', line 572

def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)

  @_handler_socket = sock
  super(sock)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



570
571
572
# File 'lib/fluent/plugin_helper/server.rb', line 570

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



569
570
571
# File 'lib/fluent/plugin_helper/server.rb', line 569

def closing
  @closing
end

Instance Method Details

#closeObject



647
648
649
650
651
652
653
654
# File 'lib/fluent/plugin_helper/server.rb', line 647

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



598
599
600
601
602
603
604
605
606
607
608
# File 'lib/fluent/plugin_helper/server.rb', line 598

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



621
622
623
624
625
626
627
# File 'lib/fluent/plugin_helper/server.rb', line 621

def on_connect
  @callback_connection = TCPCallbackSocket.new(self)
  @connect_callback.call(@callback_connection)
  unless @data_callback
    raise "connection callback must call #data to set data callback"
  end
end

#on_read_with_connection(data) ⇒ Object



638
639
640
641
642
643
644
645
# File 'lib/fluent/plugin_helper/server.rb', line 638

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



629
630
631
632
633
634
635
636
# File 'lib/fluent/plugin_helper/server.rb', line 629

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_writableObject



616
617
618
619
# File 'lib/fluent/plugin_helper/server.rb', line 616

def on_writable
  super
  close if @close_after_write_complete
end

#to_ioObject



594
595
596
# File 'lib/fluent/plugin_helper/server.rb', line 594

def to_io
  @_handler_socket
end

#write(data) ⇒ Object



610
611
612
613
614
# File 'lib/fluent/plugin_helper/server.rb', line 610

def write(data)
  @mutex.synchronize do
    super
  end
end