Class: LogStash::Inputs::Beats::MessageListener
- Inherits:
-
Object
- Object
- LogStash::Inputs::Beats::MessageListener
- Defined in:
- lib/logstash/inputs/beats/message_listener.rb
Defined Under Namespace
Classes: ConnectionState
Constant Summary collapse
- FILEBEAT_LOG_LINE_FIELD =
"message".freeze
- LSF_LOG_LINE_FIELD =
"line".freeze
Instance Attribute Summary collapse
-
#connections_list ⇒ Object
readonly
Returns the value of attribute connections_list.
-
#input ⇒ Object
readonly
Returns the value of attribute input.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(queue, input) ⇒ MessageListener
constructor
A new instance of MessageListener.
- #onChannelInitializeException(ctx, cause) ⇒ Object
- #onConnectionClose(ctx) ⇒ Object
- #onException(ctx, cause) ⇒ Object
- #onNewConnection(ctx) ⇒ Object
- #onNewMessage(ctx, message) ⇒ Object
Constructor Details
#initialize(queue, input) ⇒ MessageListener
Returns a new instance of MessageListener.
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 17 def initialize(queue, input) @connections_list = ThreadSafe::Hash.new @queue = queue @logger = input.logger @input = input @metric = @input.metric @peak_connection_count = Concurrent::AtomicFixnum.new(0) @nocodec_transformer = RawEventTransform.new(@input) @codec_transformer = DecodedEventTransform.new(@input) end |
Instance Attribute Details
#connections_list ⇒ Object (readonly)
Returns the value of attribute connections_list.
15 16 17 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15 def connections_list @connections_list end |
#input ⇒ Object (readonly)
Returns the value of attribute input.
15 16 17 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15 def input @input end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
15 16 17 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15 def logger @logger end |
Instance Method Details
#onChannelInitializeException(ctx, cause) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 59 def onChannelInitializeException(ctx, cause) # This is mostly due to a bad certificate or keys, running Logstash in debug mode will show more information if cause.is_a?(Java::JavaLang::IllegalArgumentException) if input.logger.debug? input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.", :exception => cause) else input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.") end else input.logger.warn("Error when creating a connection", :exception => cause.to_s) end end |
#onConnectionClose(ctx) ⇒ Object
54 55 56 57 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 54 def onConnectionClose(ctx) unregister_connection(ctx) decrement_connection_count() end |
#onException(ctx, cause) ⇒ Object
72 73 74 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 72 def onException(ctx, cause) unregister_connection(ctx) unless connections_list[ctx].nil? end |
#onNewConnection(ctx) ⇒ Object
49 50 51 52 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 49 def onNewConnection(ctx) register_connection(ctx) increment_connection_count() end |
#onNewMessage(ctx, message) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/logstash/inputs/beats/message_listener.rb', line 29 def onNewMessage(ctx, ) hash = .getData ip_address = ip_address(ctx) hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil? target_field = extract_target_field(hash) if target_field.nil? event = LogStash::Event.new(hash) @nocodec_transformer.transform(event) @queue << event else codec(ctx).accept(CodecCallbackListener.new(target_field, hash, .getIdentityStream(), @codec_transformer, @queue)) end end |