Class: LogStash::Inputs::Beats::MessageListener

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/inputs/beats/message_listener.rb

Defined Under Namespace

Classes: ConnectionState

Constant Summary collapse

FILEBEAT_LOG_LINE_FIELD =
"message".freeze
LSF_LOG_LINE_FIELD =
"line".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, input) ⇒ MessageListener

Returns a new instance of MessageListener.



17
18
19
20
21
22
23
24
25
# File 'lib/logstash/inputs/beats/message_listener.rb', line 17

def initialize(queue, input)
  @connections_list = ThreadSafe::Hash.new
  @queue = queue
  @logger = input.logger
  @input = input

  @nocodec_transformer = RawEventTransform.new(@input)
  @codec_transformer = DecodedEventTransform.new(@input)
end

Instance Attribute Details

#connections_listObject (readonly)

Returns the value of attribute connections_list.



15
16
17
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15

def connections_list
  @connections_list
end

#inputObject (readonly)

Returns the value of attribute input.



15
16
17
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15

def input
  @input
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/logstash/inputs/beats/message_listener.rb', line 15

def logger
  @logger
end

Instance Method Details

#onChannelInitializeException(ctx, cause) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/inputs/beats/message_listener.rb', line 53

def onChannelInitializeException(ctx, cause)
  # This is mostly due to a bad certificate or keys, running Logstash in debug mode will show more information
  if cause.is_a?(Java::JavaLang::IllegalArgumentException)
    if input.logger.debug?
      input.logger.error("Looks like you either have an invalid key or your private key was not in PKCS8 format.")
    else
      input.logger.error("Looks like you either have an invalid key or your private key was not in PKCS8 format.", :exception => cause)
    end
  else
    input.logger.warn("Error when creating a connection", :exception => cause.to_s)
  end
end

#onConnectionClose(ctx) ⇒ Object



49
50
51
# File 'lib/logstash/inputs/beats/message_listener.rb', line 49

def onConnectionClose(ctx)
  unregister_connection(ctx)
end

#onException(ctx, cause) ⇒ Object



66
67
68
# File 'lib/logstash/inputs/beats/message_listener.rb', line 66

def onException(ctx, cause)
  unregister_connection(ctx) unless connections_list[ctx].nil?
end

#onNewConnection(ctx) ⇒ Object



45
46
47
# File 'lib/logstash/inputs/beats/message_listener.rb', line 45

def onNewConnection(ctx)
  register_connection(ctx)
end

#onNewMessage(ctx, message) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/logstash/inputs/beats/message_listener.rb', line 27

def onNewMessage(ctx, message)
  hash = message.getData()

  target_field = extract_target_field(hash)

  if target_field.nil?
    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)
    @queue << event
  else
    codec(ctx).accept(CodecCallbackListener.new(target_field,
                                                hash,
                                                message.getIdentityStream(),
                                                @codec_transformer,
                                                @queue))
  end
end