Class: Lumberjack::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/lumberjack/server.rb

Overview

class Parser

Constant Summary collapse

READ_SIZE =
16384

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fd, server) ⇒ Connection

Returns a new instance of Connection.



305
306
307
308
309
310
311
# File 'lib/lumberjack/server.rb', line 305

def initialize(fd, server)
  @parser = Parser.new
  @fd = fd

  @server = server
  @ack_handler = nil
end

Instance Attribute Details

#serverObject

Returns the value of attribute server.



303
304
305
# File 'lib/lumberjack/server.rb', line 303

def server
  @server
end

Instance Method Details

#ack_if_needed(sequence, &block) ⇒ Object



362
363
364
365
# File 'lib/lumberjack/server.rb', line 362

def ack_if_needed(sequence, &block)
  block.call
  send_ack(sequence) if @ack_handler.ack?(sequence)
end

#closeObject



367
368
369
# File 'lib/lumberjack/server.rb', line 367

def close
  @fd.close unless @fd.closed?
end

#data(map, &block) ⇒ Object



371
372
373
# File 'lib/lumberjack/server.rb', line 371

def data(map, &block)
  block.call(map) if block_given?
end

#read_socket(&block) ⇒ Object

def run



326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/lumberjack/server.rb', line 326

def read_socket(&block)
  # TODO(sissel): Ack on idle.
  # X: - if any unacked, IO.select
  # X:   - on timeout, ack all.
  # X: Doing so will prevent slow streams from retransmitting
  # X: too many events after errors.
  @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args|
    case event
    when :version
      version(*args)
    when :window_size
      reset_next_ack(*args)
    when :data
      sequence, map = args
      ack_if_needed(sequence) { data(map, &block) }
    when :json
      # If the payload is an array of items we will emit multiple events
      # this behavior was moved from the plugin to the library.
      # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158
      sequence, map = args

      ack_if_needed(sequence) do
        if map.is_a?(Array)
          map.each { |e| data(e, &block) }
        else
          data(map, &block)
        end
      end
    end
  end
end

#reset_next_ack(window_size) ⇒ Object



375
376
377
378
# File 'lib/lumberjack/server.rb', line 375

def reset_next_ack(window_size)
  klass = (@version == Parser::PROTOCOL_VERSION_1) ? AckingProtocolV1 : AckingProtocolV2
  @ack_handler = klass.new(window_size)
end

#run(&block) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/lumberjack/server.rb', line 313

def run(&block)
  while !server.closed?
    read_socket(&block)
  end
rescue EOFError, OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET
  # EOF or other read errors, only action is to shutdown which we'll do in
  # 'ensure'
rescue Errno::EAGAIN
  retry unless server.closed?
ensure
  close rescue 'Already closed stream'
end

#send_ack(sequence) ⇒ Object



380
381
382
# File 'lib/lumberjack/server.rb', line 380

def send_ack(sequence)
  @fd.syswrite(@ack_handler.ack_frame(sequence))
end

#version(version) ⇒ Object



358
359
360
# File 'lib/lumberjack/server.rb', line 358

def version(version)
  @version = version
end