Class: Lumberjack::Beats::Connection
- Inherits:
-
Object
- Object
- Lumberjack::Beats::Connection
show all
- Defined in:
- lib/lumberjack/beats/server.rb
Overview
Defined Under Namespace
Classes: ConnectionClosed
Constant Summary
collapse
- READ_SIZE =
16384
- PEER_INFORMATION_NOT_AVAILABLE =
"<PEER INFORMATION NOT AVAILABLE>"
- RESCUED_CONNECTION_EXCEPTIONS =
[
EOFError,
OpenSSL::SSL::SSLError,
IOError,
Errno::ECONNRESET,
Errno::EPIPE,
Lumberjack::Beats::Parser::UnsupportedProtocol
]
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(fd, server) ⇒ Connection
Returns a new instance of Connection.
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
|
# File 'lib/lumberjack/beats/server.rb', line 421
def initialize(fd, server)
@parser = Parser.new
@fd = fd
@server = server
@ack_handler = nil
begin
@peer = "#{@fd.peeraddr[3]}:#{@fd.peeraddr[1]}"
rescue IOError
@peer = PEER_INFORMATION_NOT_AVAILABLE
end
end
|
Instance Attribute Details
#peer ⇒ Object
Returns the value of attribute peer.
419
420
421
|
# File 'lib/lumberjack/beats/server.rb', line 419
def peer
@peer
end
|
#server ⇒ Object
Returns the value of attribute server.
418
419
420
|
# File 'lib/lumberjack/beats/server.rb', line 418
def server
@server
end
|
Instance Method Details
#ack_if_needed(sequence, &block) ⇒ Object
501
502
503
504
|
# File 'lib/lumberjack/beats/server.rb', line 501
def ack_if_needed(sequence, &block)
block.call
send_ack(sequence) if @ack_handler.ack?(sequence)
end
|
#close ⇒ Object
506
507
508
|
# File 'lib/lumberjack/beats/server.rb', line 506
def close
@fd.close unless @fd.closed?
end
|
#data(map, &block) ⇒ Object
510
511
512
|
# File 'lib/lumberjack/beats/server.rb', line 510
def data(map, &block)
block.call(map, identity_stream(map)) if block_given?
end
|
#identity_stream(map) ⇒ Object
527
528
529
530
531
532
533
534
535
536
537
538
|
# File 'lib/lumberjack/beats/server.rb', line 527
def identity_stream(map)
id = map.fetch("beat", {})["id"]
if id && map["resource_id"]
identity_values = [id, map["resource_id"]]
else
identity_values = [map.fetch("beat", {})["name"],
map["source"]]
end
identity_values.compact.join("-")
end
|
488
489
490
491
492
493
494
495
|
# File 'lib/lumberjack/beats/server.rb', line 488
def normalize_v1_metadata_encoding(map)
map.each { |k, v| map[k].force_encoding(Encoding::UTF_8) unless k == Lumberjack::Beats::LSF_LOG_LINE_FIELD }
map
end
|
#read_socket(&block) ⇒ Object
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
|
# File 'lib/lumberjack/beats/server.rb', line 456
def read_socket(&block)
@parser.feed(@fd.sysread(READ_SIZE)) do |event, *args|
case event
when :version
version(*args)
when :window_size
reset_next_ack(*args)
when :data
sequence, map = args
ack_if_needed(sequence) { data(normalize_v1_metadata_encoding(map), &block) }
when :json
sequence, map = args
ack_if_needed(sequence) do
if map.is_a?(Array)
map.each { |e| data(e, &block) }
else
data(map, &block)
end
end
end
end
end
|
#reset_next_ack(window_size) ⇒ Object
514
515
516
517
|
# File 'lib/lumberjack/beats/server.rb', line 514
def reset_next_ack(window_size)
klass = version_1? ? AckingProtocolV1 : AckingProtocolV2
@ack_handler = klass.new(window_size)
end
|
#run(&block) ⇒ Object
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
|
# File 'lib/lumberjack/beats/server.rb', line 440
def run(&block)
while !server.closed?
read_socket(&block)
end
rescue *RESCUED_CONNECTION_EXCEPTIONS => e
raise ConnectionClosed.new(e)
rescue
raise unless server.closed?
ensure
close rescue 'Already closed stream'
end
|
#send_ack(sequence) ⇒ Object
519
520
521
|
# File 'lib/lumberjack/beats/server.rb', line 519
def send_ack(sequence)
@fd.syswrite(@ack_handler.ack_frame(sequence))
end
|
#version(version) ⇒ Object
497
498
499
|
# File 'lib/lumberjack/beats/server.rb', line 497
def version(version)
@version = version
end
|