Class: LogStash::Inputs::BeatsSupport::ConnectionHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/inputs/beats_support/connection_handler.rb

Overview

Handle the data coming from a connection Decide which Process should be used to decode the data coming from the beat library.

  • Should we use a codec on specific field?

  • Should we just take the raw content of the parsed json frame

Instance Method Summary collapse

Constructor Details

#initialize(connection, input, queue) ⇒ ConnectionHandler

Returns a new instance of ConnectionHandler.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 15

def initialize(connection, input, queue)
  @connection = connection

  @input = input
  @queue = queue
  @logger = input.logger

  # We need to clone the codec per connection, so we can flush a specific 
  # codec when a connection is closed.
  @codec = input.codec.dup

  @nocodec_transformer = RawEventTransform.new(@input)
  @codec_transformer = DecodedEventTransform.new(@input)
end

Instance Method Details

#acceptObject



30
31
32
33
34
35
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 30

def accept
  @logger.debug("Beats input: waiting from new events from remote host",
                :peer => @connection.peer)

  @connection.run { |hash, identity_stream| process(hash, identity_stream) }
end

#flush(&block) ⇒ Object

OOB call to flush the codec buffer,

This method is a bit tricky to decide when to be called, in the current case, this will be call on any exception raised, either is a circuit breaker or the remote host closed the connection, its better to make sure we clear their data and create duplicates then losing the data.



77
78
79
80
81
82
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 77

def flush(&block)
  @logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection",
                                  :peer => @connection.peer)

  @codec.flush(&block)
end

#process(hash, identity_stream) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 37

def process(hash, identity_stream)
  @logger.debug? && @logger.debug("Beats input: new event received",
                                  :event_hash => hash,
                                  :identity_stream => identity_stream,
                                  :peer => @connection.peer)

  # Filebeats uses the `message` key and LSF `line`
  target_field = if from_filebeat?(hash)
                   hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD)
                 elsif from_logstash_forwarder?(hash)
                   hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD)
                 end

  if target_field.nil?
    @logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field",
                                    :target_field_for_codec => @input.target_field_for_codec,
                                    :event_hash => hash)

    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)

    raise LogStash::Inputs::Beats::InsertingToQueueTakeTooLong if !@queue.offer(event)
  else
    @logger.debug? && @logger.debug("Beats input: decoding this event with the codec",
                                    :target_field_value =>  target_field)

    @codec.accept(CodecCallbackListener.new(target_field,
                                            hash,
                                            identity_stream,
                                            @codec_transformer,
                                            @queue))
  end
end