Module: Fluent::PluginHelper::Server

Includes:
CertOption, EventLoop, SocketOption
Defined in:
lib/fluent/plugin_helper/server.rb

Defined Under Namespace

Modules: EventHandler, ServerTransportParams Classes: CallbackSocket, ServerInfo, TCPCallbackSocket, TLSCallbackSocket, UDPCallbackSocket

Constant Summary collapse

PROTOCOLS =
[:tcp, :udp, :tls, :unix]
CONNECTION_PROTOCOLS =
[:tcp, :tls, :unix]
SERVER_TRANSPORT_PARAMS =
[
  :protocol, :version, :ciphers, :insecure,
  :cert_path, :private_key_path, :private_key_passphrase,
  :ca_cert_path, :ca_private_key_path, :ca_private_key_passphrase,
  :generate_private_key_length,
  :generate_cert_country, :generate_cert_state, :generate_cert_state,
  :generate_cert_locality, :generate_cert_common_name,
  :generate_cert_expiration, :generate_cert_digest,
]

Constants included from SocketOption

Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_LINGER, Fluent::PluginHelper::SocketOption::FORMAT_STRUCT_TIMEVAL

Constants included from EventLoop

EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT, EventLoop::EVENT_LOOP_SHUTDOWN_TIMEOUT

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS

Instance Attribute Summary collapse

Attributes included from EventLoop

#_event_loop

Attributes included from Thread

#_threads

Class Method Summary collapse

Instance Method Summary collapse

Methods included from CertOption

#cert_option_cert_generation_opts_from_conf, #cert_option_certificates_from_file, #cert_option_create_context, #cert_option_generate_ca_pair_self_signed, #cert_option_generate_pair, #cert_option_generate_server_pair_by_ca, #cert_option_generate_server_pair_self_signed, #cert_option_load, #cert_option_server_validate!

Methods included from SocketOption

#socket_option_set, #socket_option_set_one, #socket_option_validate!

Methods included from EventLoop

#after_shutdown, #close, #event_loop_attach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop, #start

Methods included from Thread

#after_shutdown, #close, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_serversObject (readonly)

stop : [-] shutdown : detach server event handler from event loop (event_loop) close : close listening sockets terminate: remote all server instances



46
47
48
# File 'lib/fluent/plugin_helper/server.rb', line 46

def _servers
  @_servers
end

Class Method Details

.included(mod) ⇒ Object



294
295
296
# File 'lib/fluent/plugin_helper/server.rb', line 294

def self.included(mod)
  mod.include ServerTransportParams
end

Instance Method Details

#configure(conf) ⇒ Object



305
306
307
308
309
310
311
312
313
# File 'lib/fluent/plugin_helper/server.rb', line 305

def configure(conf)
  super

  if @transport_config
    if @transport_config.protocol == :tls
      cert_option_server_validate!(@transport_config)
    end
  end
end

#initializeObject



298
299
300
301
302
303
# File 'lib/fluent/plugin_helper/server.rb', line 298

def initialize
  super
  @_servers = []
  @_server_connections = []
  @_server_mutex = Mutex.new
end

#server_attach(title, proto, port, bind, shared, server) ⇒ Object



206
207
208
209
# File 'lib/fluent/plugin_helper/server.rb', line 206

def server_attach(title, proto, port, bind, shared, server)
  @_servers << ServerInfo.new(title, proto, port, bind, shared, server)
  event_loop_attach(server)
end

#server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) ⇒ Object

server_create(:title, @port) do |data|

# ...

end server_create(:title, @port) do |data, conn|

# ...

end server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock|

sock.remote_host
sock.remote_port
# ...

end

Raises:

  • (ArgumentError)


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/fluent/plugin_helper/server.rb', line 122

def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp
  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
  raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

  if proto == :tcp || proto == :tls # default linger_timeout only for server
    socket_options[:linger_timeout] ||= 0
  end

  unless socket
    socket_option_validate!(proto, **socket_options)
    socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
  end

  if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
    raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
  end
  if proto != :udp # UDP options
    raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes
    raise ArgumentError, "BUG: flags is available only for udp" if flags != 0
  end

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
      conn.data(&callback)
    end
  when :udp
    raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
    if socket
      sock = socket
      close_socket = false
    else
      sock = server_create_udp_socket(shared, bind, port)
      socket_option_setter.call(sock)
      close_socket = true
    end
    server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
  when :unix
    raise "not implemented yet"
  else
    raise "BUG: unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end

#server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) ⇒ Object

server_create_connection(:title, @port) do |conn|

# on connection
source_addr = conn.remote_host
source_port = conn.remote_port
conn.data do |data|
  # on data
  conn.write resp # ...
  conn.close
end

end

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin_helper/server.rb', line 70

def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
  proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp

  raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
  raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
  raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
  raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto)

  raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls

  raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
  raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1

  if proto == :tcp || proto == :tls # default linger_timeout only for server
    socket_options[:linger_timeout] ||= 0
  end

  socket_option_validate!(proto, **socket_options)
  socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }

  case proto
  when :tcp
    server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  when :tls
    transport_config = if tls_options
                         server_create_transport_section_object(tls_options)
                       elsif @transport_config && @transport_config.protocol == :tls
                         @transport_config
                       else
                         raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
                       end
    server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
  when :unix
    raise "not implemented yet"
  else
    raise "unknown protocol #{proto}"
  end

  server_attach(title, proto, port, bind, shared, server)
end

#server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/fluent/plugin_helper/server.rb', line 211

def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    @_server_mutex.synchronize do
      @_server_connections << conn
    end
  end
  server.listen(backlog) if backlog
  server
end

#server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/fluent/plugin_helper/server.rb', line 224

def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
  context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
  sock = server_create_tcp_socket(shared, bind, port)
  socket_option_setter.call(sock)
  close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
  server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
    @_server_mutex.synchronize do
      @_server_connections << conn
    end
  end
  server.listen(backlog) if backlog
  server
end

#server_create_tcp(title, port, **kwargs, &callback) ⇒ Object



188
189
190
# File 'lib/fluent/plugin_helper/server.rb', line 188

def server_create_tcp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tcp, **kwargs, &callback)
end

#server_create_tcp_socket(shared, bind, port) ⇒ Object



348
349
350
351
352
353
354
355
356
357
# File 'lib/fluent/plugin_helper/server.rb', line 348

def server_create_tcp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_tcp(bind, port)
         else
           TCPServer.new(bind, port) # this method call can create sockets for AF_INET6
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end

#server_create_tls(title, port, **kwargs, &callback) ⇒ Object



196
197
198
# File 'lib/fluent/plugin_helper/server.rb', line 196

def server_create_tls(title, port, **kwargs, &callback)
  server_create(title, port, proto: :tls, **kwargs, &callback)
end

#server_create_transport_section_object(opts) ⇒ Object



248
249
250
251
252
253
254
255
256
# File 'lib/fluent/plugin_helper/server.rb', line 248

def server_create_transport_section_object(opts)
  transport_section = configured_section_create(:transport)
  SERVER_TRANSPORT_PARAMS.each do |param|
    if opts.has_key?(param)
      transport_section[param] = opts[param]
    end
  end
  transport_section
end

#server_create_udp(title, port, **kwargs, &callback) ⇒ Object



192
193
194
# File 'lib/fluent/plugin_helper/server.rb', line 192

def server_create_udp(title, port, **kwargs, &callback)
  server_create(title, port, proto: :udp, **kwargs, &callback)
end

#server_create_udp_socket(shared, bind, port) ⇒ Object



359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/fluent/plugin_helper/server.rb', line 359

def server_create_udp_socket(shared, bind, port)
  sock = if shared
           server_socket_manager_client.listen_udp(bind, port)
         else
           family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6
           usock = UDPSocket.new(family)
           usock.bind(bind, port)
           usock
         end
  # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
  sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
  sock
end

#server_create_unix(title, port, **kwargs, &callback) ⇒ Object



200
201
202
# File 'lib/fluent/plugin_helper/server.rb', line 200

def server_create_unix(title, port, **kwargs, &callback)
  server_create(title, port, proto: :unix, **kwargs, &callback)
end

#server_socket_manager_clientObject



340
341
342
343
344
345
346
# File 'lib/fluent/plugin_helper/server.rb', line 340

def server_socket_manager_client
  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  ServerEngine::SocketManager::Client.new(socket_manager_path)
end

#server_wait_until_startObject



48
49
50
# File 'lib/fluent/plugin_helper/server.rb', line 48

def server_wait_until_start
  # event_loop_wait_until_start works well for this
end

#server_wait_until_stopObject



52
53
54
55
# File 'lib/fluent/plugin_helper/server.rb', line 52

def server_wait_until_stop
  sleep 0.1 while @_servers.any?{|si| si.server.attached? }
  @_servers.each{|si| si.server.close rescue nil }
end

#shutdownObject



327
328
329
330
331
332
333
# File 'lib/fluent/plugin_helper/server.rb', line 327

def shutdown
  @_server_connections.each do |conn|
    conn.close rescue nil
  end

  super
end

#stopObject



315
316
317
318
319
320
321
322
323
324
325
# File 'lib/fluent/plugin_helper/server.rb', line 315

def stop
  @_server_mutex.synchronize do
    @_servers.each do |si|
      si.server.detach if si.server.attached?
      # to refuse more connections: (connected sockets are still alive here)
      si.server.close rescue nil
    end
  end

  super
end

#terminateObject



335
336
337
338
# File 'lib/fluent/plugin_helper/server.rb', line 335

def terminate
  @_servers = []
  super
end