Class: LogCourier::ConnectionTcp

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/server_tcp.rb

Overview

Representation of a single connected client

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, sfd, peer, options) ⇒ ConnectionTcp

Returns a new instance of ConnectionTcp.



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/log-courier/server_tcp.rb', line 228

def initialize(logger, sfd, peer, options)
  @logger = logger
  @fd = sfd
  @peer = peer
  @peer_fields = {}
  @in_progress = false
  @options = options
  @client = 'Unknown'
  @major_version = 0
  @minor_version = 0
  @patch_version = 0
  @version = '0.0.0'
  @client_version = 'Unknown'

  return unless @options[:add_peer_fields]

  @peer_fields['peer'] = peer
  return unless @options[:transport] == 'tls' && !@fd.peer_cert.nil?

  @peer_fields['peer_ssl_cn'] = get_cn(@fd.peer_cert)
end

Instance Attribute Details

#peerObject

Returns the value of attribute peer.



226
227
228
# File 'lib/log-courier/server_tcp.rb', line 226

def peer
  @peer
end

Instance Method Details

#add_fields(event) ⇒ Object



250
251
252
# File 'lib/log-courier/server_tcp.rb', line 250

def add_fields(event)
  event.merge! @peer_fields unless @peer_fields.empty?
end

#receiveObject

Raises:



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/log-courier/server_tcp.rb', line 302

def receive
  # Read message
  # Each message begins with a header
  # 4 byte signature
  # 4 byte length
  # Normally we would not parse this inside transport, but for TLS we have to in order to locate frame boundaries
  signature, length = recv(8).unpack('A4N')

  # Sanity
  raise ProtocolError, "packet too large (#{length} > #{@options[:max_packet_size]})" if length > @options[:max_packet_size]

  # While we're processing, EOF is bad as it may occur during send
  @in_progress = true

  # Read the message
  data = if length.zero?
           ''
         else
           recv(length)
         end

  # If we EOF next it's a graceful close
  @in_progress = false

  [signature, data]
end

#run(&block) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/log-courier/server_tcp.rb', line 254

def run(&block)
  handshake(&block)

  loop do
    signature, data = receive

    # Send for processing
    yield signature, data, self
  end
rescue TimeoutError
  # Timeout of the connection, we were idle too long without a ping/pong
  @logger&.warn 'Connection timed out', peer: @peer
  nil
rescue EOFError
  if @in_progress
    @logger&.warn 'Unexpected EOF', peer: @peer
  else
    @logger&.info 'Connection closed', peer: @peer
  end
  nil
rescue OpenSSL::SSL::SSLError => e
  # Read errors, only action is to shutdown which we'll do in ensure
  @logger&.warn 'SSL error, connection aborted', error: e.message, peer: @peer
  nil
rescue IOError, SystemCallError => e
  # Read errors, only action is to shutdown which we'll do in ensure
  @logger&.warn 'Connection aborted', error: e.message, peer: @peer
  nil
rescue ProtocolError => e
  # Connection abort request due to a protocol error
  @logger&.warn 'Protocol error, connection aborted', error: e.message, peer: @peer
  nil
rescue ShutdownSignal
  # Shutting down
  @logger&.info 'Server shutting down, closing connection', peer: @peer
  nil
rescue StandardError => e
  # Some other unknown problem
  @logger&.warn e.message, hint: 'Unknown error, connection aborted', peer: @peer
  nil
ensure
  begin
    @fd.close
  rescue OpenSSL::SSL::SSLError, IOError
    # Ignore during close
  end
end

#send(signature, message) ⇒ Object



329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/log-courier/server_tcp.rb', line 329

def send(signature, message)
  reset_timeout
  data = signature + [message.length].pack('N') + message
  done = 0
  loop do
    begin
      written = @fd.write_nonblock(data[done...data.length])
    rescue IO::WaitReadable
      raise TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil?

      retry
    rescue IO::WaitWritable
      raise TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil?

      retry
    end
    raise ProtocolError, "write failure (#{done}/#{data.length})" if written.zero?

    done += written
    break if done >= data.length
  end
  nil
end