Class: LogStash::Inputs::Beats::MessageListener

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/inputs/beats/message_listener.rb

Defined Under Namespace

Classes: ConnectionState

Constant Summary collapse

FILEBEAT_LOG_LINE_FIELD =
"message".freeze
LSF_LOG_LINE_FIELD =
"line".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, input) ⇒ MessageListener

Returns a new instance of MessageListener.



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/logstash/inputs/beats/message_listener.rb', line 17

def initialize(queue, input)
  @connections_list = ThreadSafe::Hash.new
  @queue = queue
  @logger = input.logger
  @input = input
  @metric = @input.metric
  @peak_connection_count = Concurrent::AtomicFixnum.new(0)

  @nocodec_transformer = RawEventTransform.new(@input)
  @codec_transformer = DecodedEventTransform.new(@input)
end

Instance Attribute Details

#connections_listObject (readonly)

Returns the value of attribute connections_list.



15
16
17
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15

def connections_list
  @connections_list
end

#inputObject (readonly)

Returns the value of attribute input.



15
16
17
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15

def input
  @input
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15

def logger
  @logger
end

Instance Method Details

#onChannelInitializeException(ctx, cause) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/logstash/inputs/beats/message_listener.rb', line 59

def onChannelInitializeException(ctx, cause)
  # This is mostly due to a bad certificate or keys, running Logstash in debug mode will show more information
  if cause.is_a?(Java::JavaLang::IllegalArgumentException)
    if input.logger.debug?
      input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.", :exception => cause)
    else
      input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.")
    end
  else
    input.logger.warn("Error when creating a connection", :exception => cause.to_s)
  end
end

#onConnectionClose(ctx) ⇒ Object



54
55
56
57
# File 'lib/logstash/inputs/beats/message_listener.rb', line 54

def onConnectionClose(ctx)
  unregister_connection(ctx)
  decrement_connection_count()
end

#onException(ctx, cause) ⇒ Object



72
73
74
# File 'lib/logstash/inputs/beats/message_listener.rb', line 72

def onException(ctx, cause)
  unregister_connection(ctx) unless connections_list[ctx].nil?
end

#onNewConnection(ctx) ⇒ Object



49
50
51
52
# File 'lib/logstash/inputs/beats/message_listener.rb', line 49

def onNewConnection(ctx)
  register_connection(ctx)
  increment_connection_count()
end

#onNewMessage(ctx, message) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/logstash/inputs/beats/message_listener.rb', line 29

def onNewMessage(ctx, message)
  hash = message.getData
  ip_address = ip_address(ctx)

  hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil?
  target_field = extract_target_field(hash)

  if target_field.nil?
    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)
    @queue << event
  else
    codec(ctx).accept(CodecCallbackListener.new(target_field,
                                                hash,
                                                message.getIdentityStream(),
                                                @codec_transformer,
                                                @queue))
  end
end