Class: Lumberjack::Beats::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/lumberjack/beats/server.rb

Overview

class Parser

Defined Under Namespace

Classes: ConnectionClosed

Constant Summary collapse

READ_SIZE =
16384
PEER_INFORMATION_NOT_AVAILABLE =
"<PEER INFORMATION NOT AVAILABLE>"
RESCUED_CONNECTION_EXCEPTIONS =
[
  EOFError,
  OpenSSL::SSL::SSLError,
  IOError,
  Errno::ECONNRESET,
  Errno::EPIPE,
  Lumberjack::Beats::Parser::UnsupportedProtocol
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fd, server) ⇒ Connection

Returns a new instance of Connection.



421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/lumberjack/beats/server.rb', line 421

def initialize(fd, server)
  @parser = Parser.new
  @fd = fd

  @server = server
  @ack_handler = nil

  # Fetch the details of the host before reading anything from the socket
  # se we can use that information when debugging connection issues with
  # remote hosts.
  begin
    @peer = "#{@fd.peeraddr[3]}:#{@fd.peeraddr[1]}"
  rescue IOError
    # This can happen if the connection is drop or close before
    # fetching the host details, lets return a generic string.
    @peer = PEER_INFORMATION_NOT_AVAILABLE
  end
end

Instance Attribute Details

#peerObject (readonly)

Returns the value of attribute peer.



419
420
421
# File 'lib/lumberjack/beats/server.rb', line 419

def peer
  @peer
end

#serverObject

Returns the value of attribute server.



418
419
420
# File 'lib/lumberjack/beats/server.rb', line 418

def server
  @server
end

Instance Method Details

#ack_if_needed(sequence, &block) ⇒ Object



501
502
503
504
# File 'lib/lumberjack/beats/server.rb', line 501

def ack_if_needed(sequence, &block)
  block.call
  send_ack(sequence) if @ack_handler.ack?(sequence)
end

#closeObject



506
507
508
# File 'lib/lumberjack/beats/server.rb', line 506

def close
  @fd.close unless @fd.closed?
end

#data(map, &block) ⇒ Object



510
511
512
# File 'lib/lumberjack/beats/server.rb', line 510

def data(map, &block)
  block.call(map, identity_stream(map)) if block_given?
end

#identity_stream(map) ⇒ Object



527
528
529
530
531
532
533
534
535
536
537
538
# File 'lib/lumberjack/beats/server.rb', line 527

def identity_stream(map)
  id = map.fetch("beat", {})["id"]

  if id && map["resource_id"]
    identity_values = [id, map["resource_id"]]
  else
    identity_values = [map.fetch("beat", {})["name"],
                      map["source"]]
  end

  identity_values.compact.join("-")
end

#normalize_v1_metadata_encoding(map) ⇒ Object



488
489
490
491
492
493
494
495
# File 'lib/lumberjack/beats/server.rb', line 488

def (map)
  # lets normalize the metadata of the v1 frame to make
  # sure everything is in utf-8 format, because LSF don't enforce the encoding when he send
  # the data to the server. Path, offset can be in another encoding, when the data is assigned to the event.
  # the event will validate it and crash when the encoding is in the wrong format.
  map.each { |k, v| map[k].force_encoding(Encoding::UTF_8) unless k == Lumberjack::Beats::LSF_LOG_LINE_FIELD }
  map
end

#read_socket(&block) ⇒ Object

def run



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/lumberjack/beats/server.rb', line 456

def read_socket(&block)
  # TODO(sissel): Ack on idle.
  # X: - if any unacked, IO.select
  # X:   - on timeout, ack all.
  # X: Doing so will prevent slow streams from retransmitting
  # X: too many events after errors.
  @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args|
    case event
    when :version
      version(*args)
    when :window_size
      reset_next_ack(*args)
    when :data
      sequence, map = args
      ack_if_needed(sequence) { data((map), &block) }
    when :json
      # If the payload is an array of items we will emit multiple events
      # this behavior was moved from the plugin to the library.
      # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158
      sequence, map = args

      ack_if_needed(sequence) do
        if map.is_a?(Array)
          map.each { |e| data(e, &block) }
        else
          data(map, &block)
        end
      end
    end
  end
end

#reset_next_ack(window_size) ⇒ Object



514
515
516
517
# File 'lib/lumberjack/beats/server.rb', line 514

def reset_next_ack(window_size)
  klass = version_1? ? AckingProtocolV1 : AckingProtocolV2
  @ack_handler = klass.new(window_size)
end

#run(&block) ⇒ Object



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/lumberjack/beats/server.rb', line 440

def run(&block)
  while !server.closed?
    read_socket(&block)
  end
rescue *RESCUED_CONNECTION_EXCEPTIONS => e
  # EOF or other read errors, only action is to shutdown which we'll do in
  # 'ensure'
  raise ConnectionClosed.new(e)
rescue
  # when the server is shutting down we can safely ignore any exceptions
  # On windows, we can get a `SystemCallErr`
  raise unless server.closed?
ensure
  close rescue 'Already closed stream'
end

#send_ack(sequence) ⇒ Object



519
520
521
# File 'lib/lumberjack/beats/server.rb', line 519

def send_ack(sequence)
  @fd.syswrite(@ack_handler.ack_frame(sequence))
end

#version(version) ⇒ Object



497
498
499
# File 'lib/lumberjack/beats/server.rb', line 497

def version(version)
  @version = version
end

#version_1?Boolean

Returns:

  • (Boolean)


523
524
525
# File 'lib/lumberjack/beats/server.rb', line 523

def version_1?
  @version == Parser::PROTOCOL_VERSION_1
end