Class: LogStash::Inputs::Beats

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/beats.rb,
lib/logstash/inputs/beats/tls.rb,
lib/logstash/inputs/beats/message_listener.rb,
lib/logstash/inputs/beats/raw_event_transform.rb,
lib/logstash/inputs/beats/event_transform_common.rb,
lib/logstash/inputs/beats/codec_callback_listener.rb,
lib/logstash/inputs/beats/decoded_event_transform.rb

Overview

This input plugin enables Logstash to receive events from the www.elastic.co/products/beats[Elastic Beats] framework.

The following example shows how to configure Logstash to listen on port 5044 for incoming Beats connections and to index into Elasticsearch:

source,ruby

input {

beats {
  port => 5044
}

}

output

elasticsearch {
  hosts => "localhost:9200"
  manage_template => false
  index => "%{[@metadata][beat]-%+YYYY+YYYY.MM+YYYY.MM.dd"
  document_type => "%[@metadata][type]"
}

}


NOTE: The Beats shipper automatically sets the ‘type` field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the <<plugins-inputs-beats-type,`type`>> config option in Logstash, it is ignored.

IMPORTANT: If you are shipping events that span multiple lines, you need to use the configuration options available in Filebeat to handle multiline events before sending the event data to Logstash. You cannot use the <<plugins-codecs-multiline>> codec to handle multiline events.

Defined Under Namespace

Classes: CodecCallbackListener, DecodedEventTransform, EventTransformCommon, MessageListener, RawEventTransform, TLS

Instance Method Summary collapse

Instance Method Details

#client_authentication_metadata?Boolean

Returns:

  • (Boolean)


215
216
217
# File 'lib/logstash/inputs/beats.rb', line 215

def client_authentication_metadata?
  @ssl_peer_metadata && ssl_configured? && client_authentification? 
end

#client_authentication_required?Boolean

Returns:

  • (Boolean)


219
220
221
# File 'lib/logstash/inputs/beats.rb', line 219

def client_authentication_required?
  @ssl_verify_mode == "force_peer" 
end

#client_authentification?Boolean

Returns:

  • (Boolean)


211
212
213
# File 'lib/logstash/inputs/beats.rb', line 211

def client_authentification?
  @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0
end

#convert_protocolsObject



231
232
233
# File 'lib/logstash/inputs/beats.rb', line 231

def convert_protocols
  TLS.get_supported(@tls_min_version..@tls_max_version).map(&:name)
end

#create_serverObject

def register



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/logstash/inputs/beats.rb', line 165

def create_server
  server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
  if @ssl

    begin
    ssl_builder = org.logstash.netty.SslSimpleBuilder.new(@ssl_certificate, @ssl_key, @ssl_key_passphrase.nil? ? nil : @ssl_key_passphrase.value)
      .setProtocols(convert_protocols)
      .setCipherSuites(normalized_ciphers)
    rescue java.lang.IllegalArgumentException => e
      raise LogStash::ConfigurationError, e
    end

    ssl_builder.setHandshakeTimeoutMilliseconds(@ssl_handshake_timeout)

    if client_authentification?
      if @ssl_verify_mode.upcase == "FORCE_PEER"
          ssl_builder.setVerifyMode(org.logstash.netty.SslSimpleBuilder::SslClientVerifyMode::FORCE_PEER)
      elsif @ssl_verify_mode.upcase == "PEER"
          ssl_builder.setVerifyMode(org.logstash.netty.SslSimpleBuilder::SslClientVerifyMode::VERIFY_PEER)
      end
      ssl_builder.setCertificateAuthorities(@ssl_certificate_authorities)
    end

    server.enableSSL(ssl_builder)
  end
  server
end

#normalized_ciphersObject



227
228
229
# File 'lib/logstash/inputs/beats.rb', line 227

def normalized_ciphers
  @cipher_suites.map(&:upcase)
end

#registerObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/logstash/inputs/beats.rb', line 125

def register
  # For Logstash 2.4 we need to make sure that the logger is correctly set for the
  # java classes before actually loading them.
  #
  # if we don't do this we will get this error:
  # log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory)
  if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j)
    LogStash::Logger.setup_log4j(@logger)
  end

  java_import "org.logstash.beats.Server"
  java_import "org.logstash.netty.SslSimpleBuilder"
  java_import "java.io.FileInputStream"
  java_import "io.netty.handler.ssl.OpenSsl"

  if !@ssl
    @logger.warn("Beats input: SSL Certificate will not be used") unless @ssl_certificate.nil?
    @logger.warn("Beats input: SSL Key will not be used") unless @ssl_key.nil?
  elsif !ssl_configured?
    raise LogStash::ConfigurationError, "Certificate or Certificate Key not configured"
  end

  if @ssl && require_certificate_authorities? && !client_authentification?
    raise LogStash::ConfigurationError, "Using `verify_mode` set to PEER or FORCE_PEER, requires the configuration of `certificate_authorities`"
  end

  if client_authentication_metadata? && !require_certificate_authorities?
    raise LogStash::ConfigurationError, "Enabling `peer_metadata` requires using `verify_mode` set to PEER or FORCE_PEER"
  end

  # Logstash 6.x breaking change (introduced with 4.0.0 of this gem)
  if @codec.kind_of? LogStash::Codecs::Multiline
    raise LogStash::ConfigurationError, "Multiline codec with beats input is not supported. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html"
  end

  @logger.info("Beats inputs: Starting input listener", :address => "#{@host}:#{@port}")

  @server = create_server
end

#require_certificate_authorities?Boolean

Returns:

  • (Boolean)


223
224
225
# File 'lib/logstash/inputs/beats.rb', line 223

def require_certificate_authorities?
  @ssl_verify_mode == "force_peer" || @ssl_verify_mode == "peer"
end

#run(output_queue) ⇒ Object



201
202
203
204
205
# File 'lib/logstash/inputs/beats.rb', line 201

def run(output_queue)
  message_listener = MessageListener.new(output_queue, self)
  @server.setMessageListener(message_listener)
  @server.listen
end

#ssl_configured?Boolean

Returns:

  • (Boolean)


193
194
195
# File 'lib/logstash/inputs/beats.rb', line 193

def ssl_configured?
  !(@ssl_certificate.nil? || @ssl_key.nil?)
end

#stopObject

def run



207
208
209
# File 'lib/logstash/inputs/beats.rb', line 207

def stop
  @server.stop unless @server.nil?
end

#target_codec_on_field?Boolean

Returns:

  • (Boolean)


197
198
199
# File 'lib/logstash/inputs/beats.rb', line 197

def target_codec_on_field?
  !@target_codec_on_field.empty?
end