Class: LogStash::Inputs::BeatsSupport::ConnectionHandler
- Inherits:
-
Object
- Object
- LogStash::Inputs::BeatsSupport::ConnectionHandler
- Defined in:
- lib/logstash/inputs/beats_support/connection_handler.rb
Overview
Handle the data coming from a connection Decide which Process should be used to decode the data coming from the beat library.
-
Should we use a codec on specific field?
-
Should we just take the raw content of the parsed json frame
Instance Method Summary collapse
- #accept ⇒ Object
-
#flush(&block) ⇒ Object
OOB call to flush the codec buffer, .
-
#initialize(connection, input, queue) ⇒ ConnectionHandler
constructor
A new instance of ConnectionHandler.
- #process(hash, identity_stream) ⇒ Object
Constructor Details
#initialize(connection, input, queue) ⇒ ConnectionHandler
Returns a new instance of ConnectionHandler.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 15 def initialize(connection, input, queue) @connection = connection @input = input @queue = queue @logger = input.logger # We need to clone the codec per connection, so we can flush a specific # codec when a connection is closed. @codec = input.codec.dup @nocodec_transformer = RawEventTransform.new(@input) @codec_transformer = DecodedEventTransform.new(@input) end |
Instance Method Details
#accept ⇒ Object
30 31 32 33 34 35 |
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 30 def accept @logger.debug("Beats input: waiting from new events from remote host", :peer => @connection.peer) @connection.run { |hash, identity_stream| process(hash, identity_stream) } end |
#flush(&block) ⇒ Object
OOB call to flush the codec buffer,
This method is a bit tricky to decide when to be called, in the current case, this will be call on any exception raised, either is a circuit breaker or the remote host closed the connection, its better to make sure we clear their data and create duplicates then losing the data.
77 78 79 80 81 82 |
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 77 def flush(&block) @logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection", :peer => @connection.peer) @codec.flush(&block) end |
#process(hash, identity_stream) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/logstash/inputs/beats_support/connection_handler.rb', line 37 def process(hash, identity_stream) @logger.debug? && @logger.debug("Beats input: new event received", :event_hash => hash, :identity_stream => identity_stream, :peer => @connection.peer) # Filebeats uses the `message` key and LSF `line` target_field = if from_filebeat?(hash) hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD) elsif from_logstash_forwarder?(hash) hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD) end if target_field.nil? @logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field", :target_field_for_codec => @input.target_field_for_codec, :event_hash => hash) event = LogStash::Event.new(hash) @nocodec_transformer.transform(event) raise LogStash::Inputs::Beats::InsertingToQueueTakeTooLong if !@queue.offer(event) else @logger.debug? && @logger.debug("Beats input: decoding this event with the codec", :target_field_value => target_field) @codec.accept(CodecCallbackListener.new(target_field, hash, identity_stream, @codec_transformer, @queue)) end end |