Class: Lumberjack::Beats::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/lumberjack/beats/server.rb

Overview

class Parser

Constant Summary collapse

READ_SIZE =
16384

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fd, server) ⇒ Connection

Returns a new instance of Connection.



306
307
308
309
310
311
312
# File 'lib/lumberjack/beats/server.rb', line 306

def initialize(fd, server)
  @parser = Parser.new
  @fd = fd

  @server = server
  @ack_handler = nil
end

Instance Attribute Details

#serverObject

Returns the value of attribute server.



304
305
306
# File 'lib/lumberjack/beats/server.rb', line 304

def server
  @server
end

Instance Method Details

#ack_if_needed(sequence, &block) ⇒ Object



373
374
375
376
# File 'lib/lumberjack/beats/server.rb', line 373

def ack_if_needed(sequence, &block)
  block.call
  send_ack(sequence) if @ack_handler.ack?(sequence)
end

#closeObject



378
379
380
# File 'lib/lumberjack/beats/server.rb', line 378

def close
  @fd.close unless @fd.closed?
end

#closed?Boolean

Returns:

  • (Boolean)


382
383
384
# File 'lib/lumberjack/beats/server.rb', line 382

def closed?
  @fd.closed?
end

#data(map, &block) ⇒ Object



386
387
388
# File 'lib/lumberjack/beats/server.rb', line 386

def data(map, &block)
  block.call(map) if block_given?
end

#identity_stream(map) ⇒ Object



403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/lumberjack/beats/server.rb', line 403

def identity_stream(map)
  id = map.fetch("beat", {})["id"]

  if id && map["resource_id"]
    identity_values = [id, map["resource_id"]]
  else
    identity_values = [map.fetch("beat", {})["name"],
                      map["source"]]
  end

  identity_values.compact.join("-")
end

#peerObject



314
315
316
# File 'lib/lumberjack/beats/server.rb', line 314

def peer
  "#{@fd.peeraddr[3]}:#{@fd.peeraddr[1]}"
end

#read_socket(&block) ⇒ Object

def run



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/lumberjack/beats/server.rb', line 337

def read_socket(&block)
  # TODO(sissel): Ack on idle.
  # X: - if any unacked, IO.select
  # X:   - on timeout, ack all.
  # X: Doing so will prevent slow streams from retransmitting
  # X: too many events after errors.
  @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args|
    case event
    when :version
      version(*args)
    when :window_size
      reset_next_ack(*args)
    when :data
      sequence, map = args
      ack_if_needed(sequence) { data(map, &block) }
    when :json
      # If the payload is an array of items we will emit multiple events
      # this behavior was moved from the plugin to the library.
      # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158
      sequence, map = args

      ack_if_needed(sequence) do
        if map.is_a?(Array)
          map.each { |e| data(e, &block) }
        else
          data(map, &block)
        end
      end
    end
  end
end

#reset_next_ack(window_size) ⇒ Object



390
391
392
393
# File 'lib/lumberjack/beats/server.rb', line 390

def reset_next_ack(window_size)
  klass = version_1? ? AckingProtocolV1 : AckingProtocolV2
  @ack_handler = klass.new(window_size)
end

#run(&block) ⇒ Object



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/lumberjack/beats/server.rb', line 318

def run(&block)
  while !server.closed?
    read_socket(&block)
  end
rescue EOFError,
  OpenSSL::SSL::SSLError,
  IOError,
  Errno::ECONNRESET,
  Errno::EPIPE
  # EOF or other read errors, only action is to shutdown which we'll do in
  # 'ensure'
rescue
  # when the server is shutting down we can safely ignore any exceptions
  # On windows, we can get a `SystemCallErr`
  raise unless server.closed?
ensure
  close rescue 'Already closed stream'
end

#send_ack(sequence) ⇒ Object



395
396
397
# File 'lib/lumberjack/beats/server.rb', line 395

def send_ack(sequence)
  @fd.syswrite(@ack_handler.ack_frame(sequence))
end

#version(version) ⇒ Object



369
370
371
# File 'lib/lumberjack/beats/server.rb', line 369

def version(version)
  @version = version
end

#version_1?Boolean

Returns:

  • (Boolean)


399
400
401
# File 'lib/lumberjack/beats/server.rb', line 399

def version_1?
  @version == Parser::PROTOCOL_VERSION_1
end