Class: LogStash::Inputs::Beats

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::EventSupport::EventFactoryAdapter, PluginMixins::NormalizeConfigSupport, PluginMixins::PluginFactorySupport
Defined in:
lib/logstash/inputs/beats.rb,
lib/logstash/inputs/beats/tls.rb,
lib/logstash/inputs/beats/message_listener.rb,
lib/logstash/inputs/beats/raw_event_transform.rb,
lib/logstash/inputs/beats/event_transform_common.rb,
lib/logstash/inputs/beats/codec_callback_listener.rb,
lib/logstash/inputs/beats/decoded_event_transform.rb

Overview

This input plugin enables Logstash to receive events from the www.elastic.co/products/beats[Elastic Beats] framework.

The following example shows how to configure Logstash to listen on port 5044 for incoming Beats connections and to index into Elasticsearch:

source,ruby

input {

beats {
  port => 5044
}

}

output

elasticsearch {
  hosts => "localhost:9200"
  manage_template => false
  index => "%{[@metadata][beat]-%+YYYY+YYYY.MM+YYYY.MM.dd"
  document_type => "%[@metadata][type]"
}

}


NOTE: The Beats shipper automatically sets the ‘type` field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the <<plugins-inputs-beats-type,`type`>> config option in Logstash, it is ignored.

IMPORTANT: If you are shipping events that span multiple lines, you need to use the configuration options available in Filebeat to handle multiline events before sending the event data to Logstash. You cannot use the <<plugins-codecs-multiline>> codec to handle multiline events.

Defined Under Namespace

Classes: CodecCallbackListener, DecodedEventTransform, EventTransformCommon, MessageListener, RawEventTransform, TLS

Constant Summary collapse

ENRICH_DEFAULTS =
{
  'source_metadata'   => true,
  'codec_metadata'    => true,
  'ssl_peer_metadata' => false,
}.freeze
ENRICH_ALL =
ENRICH_DEFAULTS.keys.freeze
ENRICH_DEFAULT =
ENRICH_DEFAULTS.select { |_,v| v }.keys.freeze
ENRICH_NONE =
['none'].freeze
ENRICH_ALIASES =
%w(none all)

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#field_hostipObject (readonly)

Returns the value of attribute field_hostip.



171
172
173
# File 'lib/logstash/inputs/beats.rb', line 171

def field_hostip
  @field_hostip
end

#field_hostnameObject (readonly)

Returns the value of attribute field_hostname.



171
172
173
# File 'lib/logstash/inputs/beats.rb', line 171

def field_hostname
  @field_hostname
end

#field_tls_cipherObject (readonly)

Returns the value of attribute field_tls_cipher.



172
173
174
# File 'lib/logstash/inputs/beats.rb', line 172

def field_tls_cipher
  @field_tls_cipher
end

#field_tls_peer_subjectObject (readonly)

Returns the value of attribute field_tls_peer_subject.



172
173
174
# File 'lib/logstash/inputs/beats.rb', line 172

def field_tls_peer_subject
  @field_tls_peer_subject
end

#field_tls_protocol_versionObject (readonly)

Returns the value of attribute field_tls_protocol_version.



172
173
174
# File 'lib/logstash/inputs/beats.rb', line 172

def field_tls_protocol_version
  @field_tls_protocol_version
end

#include_source_metadataObject (readonly)

Returns the value of attribute include_source_metadata.



173
174
175
# File 'lib/logstash/inputs/beats.rb', line 173

def 
  @include_source_metadata
end

Instance Method Details

#certificate_authorities_configured?Boolean

Returns:

  • (Boolean)


280
281
282
# File 'lib/logstash/inputs/beats.rb', line 280

def certificate_authorities_configured?
  @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0
end

#client_authentication_enabled?Boolean

Returns:

  • (Boolean)


269
270
271
272
273
274
275
276
277
278
# File 'lib/logstash/inputs/beats.rb', line 269

def client_authentication_enabled?
  if original_params.include?('ssl_client_authentication')
    return client_authentication_optional? || client_authentication_required?
  end

  # Keep backward compatibility with the deprecated `ssl_verify_mode` until it's not removed.
  # When it's explicitly set (or both settings are absent), it should use the ssl_certificate_authorities
  # to enable/disable the client authentication. (even if ssl_verify_mode => none)
  certificate_authorities_configured?
end

#client_authentication_metadata?Boolean

Returns:

  • (Boolean)


284
285
286
# File 'lib/logstash/inputs/beats.rb', line 284

def client_authentication_metadata?
  @ssl_enabled && @ssl_peer_metadata && ssl_configured? && client_authentication_enabled?
end

#client_authentication_none?Boolean

Returns:

  • (Boolean)


296
297
298
# File 'lib/logstash/inputs/beats.rb', line 296

def client_authentication_none?
  @ssl_client_authentication && @ssl_client_authentication.downcase == SSL_CLIENT_AUTH_NONE
end

#client_authentication_optional?Boolean

Returns:

  • (Boolean)


292
293
294
# File 'lib/logstash/inputs/beats.rb', line 292

def client_authentication_optional?
  @ssl_client_authentication && @ssl_client_authentication.downcase == SSL_CLIENT_AUTH_OPTIONAL
end

#client_authentication_required?Boolean

Returns:

  • (Boolean)


288
289
290
# File 'lib/logstash/inputs/beats.rb', line 288

def client_authentication_required?
  @ssl_client_authentication && @ssl_client_authentication.downcase == SSL_CLIENT_AUTH_REQUIRED
end

#create_serverObject

def register



245
246
247
248
249
# File 'lib/logstash/inputs/beats.rb', line 245

def create_server
  server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
  server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled
  server
end

#include_source_metadata?Boolean

Returns:

  • (Boolean)


304
305
306
# File 'lib/logstash/inputs/beats.rb', line 304

def include_source_metadata?
  return @include_source_metadata
end

#registerObject



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/logstash/inputs/beats.rb', line 197

def register
  # For Logstash 2.4 we need to make sure that the logger is correctly set for the
  # java classes before actually loading them.
  #
  # if we don't do this we will get this error:
  # log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory)
  if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j)
    LogStash::Logger.setup_log4j(@logger)
  end

  setup_ssl_params!

  validate_ssl_config!

  active_enrichments = resolve_enriches

  @include_source_metadata = active_enrichments.include?('source_metadata')
  @include_codec_tag = original_params.include?('include_codec_tag') ? params['include_codec_tag'] : active_enrichments.include?('codec_metadata')
  @ssl_peer_metadata = original_params.include?('ssl_peer_metadata') ? params['ssl_peer_metadata'] : active_enrichments.include?('ssl_peer_metadata')

  # intentionally ask users to provide codec when they want to use the codec metadata
  # second layer enrich is also a controller, provide enrich => ['codec_metadata' or/with 'source_metadata'] with codec if you override event original
  unless active_enrichments.include?('codec_metadata')
    if original_params.include?('codec')
      @logger.warn("An explicit `codec` is specified but `enrich` does not include `codec_metadata`. ECS compatibility will remain aligned on the pipeline or codec's `ecs_compatibility` (enabled by default).")
    else
      @codec = plugin_factory.codec('plain').new('ecs_compatibility' => 'disabled')
      @logger.debug('Disabling `ecs_compatibility` for the default codec since `enrich` configuration does not include `codec_metadata` and no explicit codec is set.')
    end
  end

  # Logstash 6.x breaking change (introduced with 4.0.0 of this gem)
  if @codec.kind_of? LogStash::Codecs::Multiline
    configuration_error "Multiline codec with beats input is not supported. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html"
  end

  # define ecs name mapping
  @field_hostname = ecs_select[disabled: "host", v1: "[@metadata][input][beats][host][name]"]
  @field_hostip = ecs_select[disabled: "[@metadata][ip_address]", v1: "[@metadata][input][beats][host][ip]"]
  @field_tls_protocol_version = ecs_select[disabled: "[@metadata][tls_peer][protocol]", v1: "[@metadata][input][beats][tls][version_protocol]"]
  @field_tls_peer_subject = ecs_select[disabled: "[@metadata][tls_peer][subject]", v1: "[@metadata][input][beats][tls][client][subject]"]
  @field_tls_cipher = ecs_select[disabled: "[@metadata][tls_peer][cipher_suite]", v1: "[@metadata][input][beats][tls][cipher]"]

  @logger.info("Starting input listener", :address => "#{@host}:#{@port}")

  @server = create_server
end

#require_certificate_authorities?Boolean

Returns:

  • (Boolean)


300
301
302
# File 'lib/logstash/inputs/beats.rb', line 300

def require_certificate_authorities?
  client_authentication_required? || client_authentication_optional?
end

#run(output_queue) ⇒ Object



251
252
253
254
255
# File 'lib/logstash/inputs/beats.rb', line 251

def run(output_queue)
  message_listener = MessageListener.new(output_queue, self)
  @server.setMessageListener(message_listener)
  @server.listen
end

#ssl_configured?Boolean

Returns:

  • (Boolean)


261
262
263
# File 'lib/logstash/inputs/beats.rb', line 261

def ssl_configured?
  !(@ssl_certificate.nil? || @ssl_key.nil?)
end

#stopObject

def run



257
258
259
# File 'lib/logstash/inputs/beats.rb', line 257

def stop
  @server.stop unless @server.nil?
end

#target_codec_on_field?Boolean

Returns:

  • (Boolean)


265
266
267
# File 'lib/logstash/inputs/beats.rb', line 265

def target_codec_on_field?
  !@target_codec_on_field.empty?
end