Class: LogCourier::ConnectionTcp

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/server_tcp.rb

Overview

Representation of a single connected client

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, fd, peer, options) ⇒ ConnectionTcp

Returns a new instance of ConnectionTcp.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/log-courier/server_tcp.rb', line 217

def initialize(logger, fd, peer, options)
  @logger = logger
  @fd = fd
  @peer = peer
  @peer_fields = {}
  @in_progress = false
  @options = options

  if @options[:add_peer_fields]
    @peer_fields['peer'] = peer
    if @options[:transport] == 'tls' && !@fd.peer_cert.nil?
      @peer_fields['peer_ssl_cn'] = get_cn(@fd.peer_cert)
    end
  end
end

Instance Attribute Details

#peerObject

Returns the value of attribute peer.



215
216
217
# File 'lib/log-courier/server_tcp.rb', line 215

def peer
  @peer
end

Instance Method Details

#add_fields(event) ⇒ Object



233
234
235
# File 'lib/log-courier/server_tcp.rb', line 233

def add_fields(event)
  event.merge! @peer_fields if @peer_fields.length != 0
end

#process_messagesObject



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/log-courier/server_tcp.rb', line 249

def process_messages
  loop do
    # Read messages
    # Each message begins with a header
    # 4 byte signature
    # 4 byte length
    # Normally we would not parse this inside transport, but for TLS we have to in order to locate frame boundaries
    signature, length = recv(8).unpack('A4N')

    # Sanity
    if length > @options[:max_packet_size]
      fail ProtocolError, "packet too large (#{length} > #{@options[:max_packet_size]})"
    end

    # While we're processing, EOF is bad as it may occur during send
    @in_progress = true

    # Read the message
    if length == 0
      data = ''
    else
      data = recv(length)
    end

    # Send for processing
    yield signature, data, self

    # If we EOF next it's a graceful close
    @in_progress = false
  end
rescue TimeoutError
  # Timeout of the connection, we were idle too long without a ping/pong
  @logger.warn 'Connection timed out', :peer => @peer unless @logger.nil?
  return
rescue EOFError
  if @in_progress
    @logger.warn 'Unexpected EOF', :peer => @peer unless @logger.nil?
  else
    @logger.info 'Connection closed', :peer => @peer unless @logger.nil?
  end
  return
rescue OpenSSL::SSL::SSLError => e
  # Read errors, only action is to shutdown which we'll do in ensure
  @logger.warn 'SSL error, connection aborted', :error => e.message, :peer => @peer unless @logger.nil?
  return
rescue IOError, Errno::ECONNRESET => e
  # Read errors, only action is to shutdown which we'll do in ensure
  @logger.warn 'Connection aborted', :error => e.message, :peer => @peer unless @logger.nil?
  return
rescue ProtocolError => e
  # Connection abort request due to a protocol error
  @logger.warn 'Protocol error, connection aborted', :error => e.message, :peer => @peer unless @logger.nil?
  return
ensure
  @fd.close rescue nil
end

#run(&block) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
# File 'lib/log-courier/server_tcp.rb', line 237

def run(&block)
  process_messages &block
rescue ShutdownSignal
  # Shutting down
  @logger.info 'Server shutting down, closing connection', :peer => @peer unless @logger.nil?
  return
rescue StandardError, NativeException => e
  # Some other unknown problem
  @logger.warn e.message, :hint => 'Unknown error, connection aborted', :peer => @peer unless @logger.nil?
  return
end

#send(signature, message) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/log-courier/server_tcp.rb', line 306

def send(signature, message)
  reset_timeout
  data = signature + [message.length].pack('N') + message
  done = 0
  loop do
    begin
      written = @fd.write_nonblock(data[done...data.length])
    rescue IO::WaitReadable
      fail TimeoutError if IO.select([@fd], nil, [@fd], @timeout - Time.now.to_i).nil?
      retry
    rescue IO::WaitWritable
      fail TimeoutError if IO.select(nil, [@fd], [@fd], @timeout - Time.now.to_i).nil?
      retry
    end
    fail ProtocolError, "write failure (#{done}/#{data.length})" if written == 0
    done += written
    break if done >= data.length
  end
  return
end