Class: Lumberjack::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/lumberjack/server.rb

Overview

class Parser

Constant Summary collapse

READ_SIZE =
16384

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fd, server) ⇒ Connection

Returns a new instance of Connection.



305
306
307
308
309
310
311
# File 'lib/lumberjack/server.rb', line 305

def initialize(fd, server)
  @parser = Parser.new
  @fd = fd

  @server = server
  @ack_handler = nil
end

Instance Attribute Details

#serverObject

Returns the value of attribute server.



303
304
305
# File 'lib/lumberjack/server.rb', line 303

def server
  @server
end

Instance Method Details

#ack_if_needed(sequence, &block) ⇒ Object



360
361
362
363
# File 'lib/lumberjack/server.rb', line 360

def ack_if_needed(sequence, &block)
  block.call
  send_ack(sequence) if @ack_handler.ack?(sequence)
end

#closeObject



365
366
367
# File 'lib/lumberjack/server.rb', line 365

def close
  @fd.close unless @fd.closed?
end

#data(map, &block) ⇒ Object



369
370
371
# File 'lib/lumberjack/server.rb', line 369

def data(map, &block)
  block.call(map) if block_given?
end

#read_socket(&block) ⇒ Object

def run



324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/lumberjack/server.rb', line 324

def read_socket(&block)
  # TODO(sissel): Ack on idle.
  # X: - if any unacked, IO.select
  # X:   - on timeout, ack all.
  # X: Doing so will prevent slow streams from retransmitting
  # X: too many events after errors.
  @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args|
    case event
    when :version
      version(*args)
    when :window_size
      reset_next_ack(*args)
    when :data
      sequence, map = args
      ack_if_needed(sequence) { data(map, &block) }
    when :json
      # If the payload is an array of items we will emit multiple events
      # this behavior was moved from the plugin to the library.
      # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158
      sequence, map = args

      ack_if_needed(sequence) do
        if map.is_a?(Array)
          map.each { |e| data(e, &block) }
        else
          data(map, &block)
        end
      end
    end
  end
end

#reset_next_ack(window_size) ⇒ Object



373
374
375
376
# File 'lib/lumberjack/server.rb', line 373

def reset_next_ack(window_size)
  klass = (@version == Parser::PROTOCOL_VERSION_1) ? AckingProtocolV1 : AckingProtocolV2
  @ack_handler = klass.new(window_size)
end

#run(&block) ⇒ Object



313
314
315
316
317
318
319
320
321
322
# File 'lib/lumberjack/server.rb', line 313

def run(&block)
  while !server.closed?
    read_socket(&block)
  end
rescue EOFError, OpenSSL::SSL::SSLError, IOError, Errno::ECONNRESET
  # EOF or other read errors, only action is to shutdown which we'll do in
  # 'ensure'
ensure
  close rescue 'Already closed stream'
end

#send_ack(sequence) ⇒ Object



378
379
380
# File 'lib/lumberjack/server.rb', line 378

def send_ack(sequence)
  @fd.syswrite(@ack_handler.ack_frame(sequence))
end

#version(version) ⇒ Object



356
357
358
# File 'lib/lumberjack/server.rb', line 356

def version(version)
  @version = version
end