Class: LogStash::Inputs::Beats::MessageListener

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/inputs/beats/message_listener.rb

Defined Under Namespace

Classes: ConnectionState

Constant Summary collapse

FILEBEAT_LOG_LINE_FIELD =
"message".freeze
LSF_LOG_LINE_FIELD =
"line".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, input) ⇒ MessageListener

Returns a new instance of MessageListener.



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/logstash/inputs/beats/message_listener.rb', line 18

def initialize(queue, input)
  @connections_list = ThreadSafe::Hash.new
  @queue = queue
  @logger = input.logger
  @input = input
  @metric = @input.metric
  @peak_connection_count = Concurrent::AtomicFixnum.new(0)

  @nocodec_transformer = RawEventTransform.new(@input)
  @codec_transformer = DecodedEventTransform.new(@input)
end

Instance Attribute Details

#connections_listObject (readonly)

Returns the value of attribute connections_list.



16
17
18
# File 'lib/logstash/inputs/beats/message_listener.rb', line 16

def connections_list
  @connections_list
end

#inputObject (readonly)

Returns the value of attribute input.



16
17
18
# File 'lib/logstash/inputs/beats/message_listener.rb', line 16

def input
  @input
end

#loggerObject (readonly)

Returns the value of attribute logger.



16
17
18
# File 'lib/logstash/inputs/beats/message_listener.rb', line 16

def logger
  @logger
end

Instance Method Details

#onChannelInitializeException(ctx, cause) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/logstash/inputs/beats/message_listener.rb', line 64

def onChannelInitializeException(ctx, cause)
  # This is mostly due to a bad certificate or keys, running Logstash in debug mode will show more information
  if cause.is_a?(Java::JavaLang::IllegalArgumentException)
    if input.logger.debug?
      input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.", :exception => cause)
    else
      input.logger.error("Looks like you either have a bad certificate, an invalid key or your private key was not in PKCS8 format.")
    end
  else
    input.logger.warn("Error when creating a connection", :exception => cause.to_s)
  end
end

#onConnectionClose(ctx) ⇒ Object



59
60
61
62
# File 'lib/logstash/inputs/beats/message_listener.rb', line 59

def onConnectionClose(ctx)
  unregister_connection(ctx)
  decrement_connection_count()
end

#onException(ctx, cause) ⇒ Object



77
78
79
# File 'lib/logstash/inputs/beats/message_listener.rb', line 77

def onException(ctx, cause)
  unregister_connection(ctx) unless connections_list[ctx].nil?
end

#onNewConnection(ctx) ⇒ Object



54
55
56
57
# File 'lib/logstash/inputs/beats/message_listener.rb', line 54

def onNewConnection(ctx)
  register_connection(ctx)
  increment_connection_count()
end

#onNewMessage(ctx, message) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/logstash/inputs/beats/message_listener.rb', line 30

def onNewMessage(ctx, message)
  hash = message.getData
  ip_address = ip_address(ctx)

  unless ip_address.nil? || hash['@metadata'].nil?
    set_nested(hash, @input.field_hostip, ip_address)
  end
  target_field = extract_target_field(hash)

  extract_tls_peer(hash, ctx)

  if target_field.nil?
    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)
    @queue << event
  else
    codec(ctx).accept(CodecCallbackListener.new(target_field,
                                                hash,
                                                message.getIdentityStream(),
                                                @codec_transformer,
                                                @queue))
  end
end

#set_nested(hash, field_name, value) ⇒ Object

only to make it testable



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/logstash/inputs/beats/message_listener.rb', line 162

def set_nested(hash, field_name, value)
  field_ref = Java::OrgLogstash::FieldReference.from(field_name)
  # create @metadata sub-hash if needed
  if field_ref.type == Java::OrgLogstash::FieldReference::META_CHILD
    unless hash.key?("@metadata")
      hash["@metadata"] = {}
    end
    nesting_hash = hash["@metadata"]
  else
    nesting_hash = hash
  end

  field_ref.path.each do |token|
    nesting_hash[token] = {} unless nesting_hash.key?(token)
    nesting_hash = nesting_hash[token]
  end
  nesting_hash[field_ref.key] = value
end