Class: LogStash::Inputs::Beats

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::EventSupport::EventFactoryAdapter, PluginMixins::PluginFactorySupport
Defined in:
lib/logstash/inputs/beats.rb,
lib/logstash/inputs/beats/message_listener.rb,
lib/logstash/inputs/beats/raw_event_transform.rb,
lib/logstash/inputs/beats/event_transform_common.rb,
lib/logstash/inputs/beats/codec_callback_listener.rb,
lib/logstash/inputs/beats/decoded_event_transform.rb

Overview

This input plugin enables Logstash to receive events from the www.elastic.co/products/beats[Elastic Beats] framework.

The following example shows how to configure Logstash to listen on port 5044 for incoming Beats connections and to index into Elasticsearch:

source,ruby

input {

beats {
  port => 5044
}

}

output

elasticsearch {
  hosts => "localhost:9200"
  manage_template => false
  index => "%{[@metadata][beat]-%+YYYY+YYYY.MM+YYYY.MM.dd"
  document_type => "%[@metadata][type]"
}

}


NOTE: The Beats shipper automatically sets the ‘type` field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the <<plugins-inputs-beats-type,`type`>> config option in Logstash, it is ignored.

IMPORTANT: If you are shipping events that span multiple lines, you need to use the configuration options available in Filebeat to handle multiline events before sending the event data to Logstash. You cannot use the <<plugins-codecs-multiline>> codec to handle multiline events.

Defined Under Namespace

Classes: CodecCallbackListener, DecodedEventTransform, EventTransformCommon, MessageListener, RawEventTransform

Constant Summary collapse

ENRICH_DEFAULTS =
{
  'source_metadata'   => true,
  'codec_metadata'    => true,
  'ssl_peer_metadata' => false, # adds client certificate information in event's metadata
}.freeze
ENRICH_ALL =
ENRICH_DEFAULTS.keys.freeze
ENRICH_DEFAULT =
ENRICH_DEFAULTS.select { |_,v| v }.keys.freeze
ENRICH_NONE =
['none'].freeze
ENRICH_ALIASES =
%w(none all)

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#field_hostipObject (readonly)

Returns the value of attribute field_hostip.



148
149
150
# File 'lib/logstash/inputs/beats.rb', line 148

def field_hostip
  @field_hostip
end

#field_hostnameObject (readonly)

Returns the value of attribute field_hostname.



148
149
150
# File 'lib/logstash/inputs/beats.rb', line 148

def field_hostname
  @field_hostname
end

#field_tls_cipherObject (readonly)

Returns the value of attribute field_tls_cipher.



149
150
151
# File 'lib/logstash/inputs/beats.rb', line 149

def field_tls_cipher
  @field_tls_cipher
end

#field_tls_peer_subjectObject (readonly)

Returns the value of attribute field_tls_peer_subject.



149
150
151
# File 'lib/logstash/inputs/beats.rb', line 149

def field_tls_peer_subject
  @field_tls_peer_subject
end

#field_tls_protocol_versionObject (readonly)

Returns the value of attribute field_tls_protocol_version.



149
150
151
# File 'lib/logstash/inputs/beats.rb', line 149

def field_tls_protocol_version
  @field_tls_protocol_version
end

#include_source_metadataObject (readonly)

Returns the value of attribute include_source_metadata.



151
152
153
# File 'lib/logstash/inputs/beats.rb', line 151

def 
  @include_source_metadata
end

#include_ssl_peer_metadataObject (readonly)

Returns the value of attribute include_ssl_peer_metadata.



150
151
152
# File 'lib/logstash/inputs/beats.rb', line 150

def 
  @include_ssl_peer_metadata
end

Instance Method Details

#certificate_authorities_configured?Boolean

Returns:

  • (Boolean)


240
241
242
# File 'lib/logstash/inputs/beats.rb', line 240

def certificate_authorities_configured?
  @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0
end

#client_authentication_enabled?Boolean

Returns:

  • (Boolean)


231
232
233
234
235
236
237
238
# File 'lib/logstash/inputs/beats.rb', line 231

def client_authentication_enabled?
  if original_params.include?('ssl_client_authentication')
    return client_authentication_optional? || client_authentication_required?
  end

  # also uses the ssl_certificate_authorities to enable/disable the client authentication
  certificate_authorities_configured?
end

#client_authentication_metadata?Boolean

Returns:

  • (Boolean)


244
245
246
# File 'lib/logstash/inputs/beats.rb', line 244

def client_authentication_metadata?
  @ssl_enabled && @include_ssl_peer_metadata && ssl_configured? && client_authentication_enabled?
end

#client_authentication_none?Boolean

Returns:

  • (Boolean)


256
257
258
# File 'lib/logstash/inputs/beats.rb', line 256

def client_authentication_none?
  @ssl_client_authentication && @ssl_client_authentication.downcase == SSL_CLIENT_AUTH_NONE
end

#client_authentication_optional?Boolean

Returns:

  • (Boolean)


252
253
254
# File 'lib/logstash/inputs/beats.rb', line 252

def client_authentication_optional?
  @ssl_client_authentication && @ssl_client_authentication.downcase == SSL_CLIENT_AUTH_OPTIONAL
end

#client_authentication_required?Boolean

Returns:

  • (Boolean)


248
249
250
# File 'lib/logstash/inputs/beats.rb', line 248

def client_authentication_required?
  @ssl_client_authentication && @ssl_client_authentication.downcase == SSL_CLIENT_AUTH_REQUIRED
end

#create_serverObject

def register



207
208
209
210
211
# File 'lib/logstash/inputs/beats.rb', line 207

def create_server
  server = org.logstash.beats.Server.new(@id, @host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads)
  server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled
  server
end

#include_source_metadata?Boolean

Returns:

  • (Boolean)


264
265
266
# File 'lib/logstash/inputs/beats.rb', line 264

def include_source_metadata?
  return @include_source_metadata
end

#registerObject



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/logstash/inputs/beats.rb', line 161

def register
  # For Logstash 2.4 we need to make sure that the logger is correctly set for the
  # java classes before actually loading them.
  #
  # if we don't do this we will get this error:
  # log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory)
  if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j)
    LogStash::Logger.setup_log4j(@logger)
  end

  validate_ssl_config!

  active_enrichments = resolve_enriches

  @include_source_metadata = active_enrichments.include?('source_metadata')
  @include_ssl_peer_metadata = active_enrichments.include?('ssl_peer_metadata')
  @include_codec_tag = original_params.include?('include_codec_tag') ? params['include_codec_tag'] : active_enrichments.include?('codec_metadata')

  # intentionally ask users to provide codec when they want to use the codec metadata
  # second layer enrich is also a controller, provide enrich => ['codec_metadata' or/with 'source_metadata'] with codec if you override event original
  unless active_enrichments.include?('codec_metadata')
    if original_params.include?('codec')
      @logger.warn("An explicit `codec` is specified but `enrich` does not include `codec_metadata`. ECS compatibility will remain aligned on the pipeline or codec's `ecs_compatibility` (enabled by default).")
    else
      @codec = plugin_factory.codec('plain').new('ecs_compatibility' => 'disabled')
      @logger.debug('Disabling `ecs_compatibility` for the default codec since `enrich` configuration does not include `codec_metadata` and no explicit codec is set.')
    end
  end

  # Logstash 6.x breaking change (introduced with 4.0.0 of this gem)
  if @codec.kind_of? LogStash::Codecs::Multiline
    configuration_error "Multiline codec with beats input is not supported. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html"
  end

  # define ecs name mapping
  @field_hostname = ecs_select[disabled: "host", v1: "[@metadata][input][beats][host][name]"]
  @field_hostip = ecs_select[disabled: "[@metadata][ip_address]", v1: "[@metadata][input][beats][host][ip]"]
  @field_tls_protocol_version = ecs_select[disabled: "[@metadata][tls_peer][protocol]", v1: "[@metadata][input][beats][tls][version_protocol]"]
  @field_tls_peer_subject = ecs_select[disabled: "[@metadata][tls_peer][subject]", v1: "[@metadata][input][beats][tls][client][subject]"]
  @field_tls_cipher = ecs_select[disabled: "[@metadata][tls_peer][cipher_suite]", v1: "[@metadata][input][beats][tls][cipher]"]

  @logger.info("Starting input listener", :address => "#{@host}:#{@port}")

  @server = create_server
end

#require_certificate_authorities?Boolean

Returns:

  • (Boolean)


260
261
262
# File 'lib/logstash/inputs/beats.rb', line 260

def require_certificate_authorities?
  client_authentication_required? || client_authentication_optional?
end

#run(output_queue) ⇒ Object



213
214
215
216
217
# File 'lib/logstash/inputs/beats.rb', line 213

def run(output_queue)
  message_listener = MessageListener.new(output_queue, self)
  @server.setMessageListener(message_listener)
  @server.listen
end

#ssl_configured?Boolean

Returns:

  • (Boolean)


223
224
225
# File 'lib/logstash/inputs/beats.rb', line 223

def ssl_configured?
  !(@ssl_certificate.nil? || @ssl_key.nil?)
end

#stopObject

def run



219
220
221
# File 'lib/logstash/inputs/beats.rb', line 219

def stop
  @server.stop unless @server.nil?
end

#target_codec_on_field?Boolean

Returns:

  • (Boolean)


227
228
229
# File 'lib/logstash/inputs/beats.rb', line 227

def target_codec_on_field?
  !@target_codec_on_field.empty?
end