Class: LogStash::Inputs::Beats

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/beats.rb,
lib/logstash/inputs/beats/tls.rb,
lib/logstash/inputs/beats/message_listener.rb,
lib/logstash/inputs/beats/raw_event_transform.rb,
lib/logstash/inputs/beats/event_transform_common.rb,
lib/logstash/inputs/beats/codec_callback_listener.rb,
lib/logstash/inputs/beats/decoded_event_transform.rb

Overview

This input plugin enables Logstash to receive events from the www.elastic.co/products/beats[Elastic Beats] framework.

The following example shows how to configure Logstash to listen on port 5044 for incoming Beats connections and to index into Elasticsearch:

source,ruby

input {

beats {
  port => 5044
}

}

output

elasticsearch {
  hosts => "localhost:9200"
  manage_template => false
  index => "%{[@metadata][beat]-%+YYYY+YYYY.MM+YYYY.MM.dd"
  document_type => "%[@metadata][type]"
}

}


NOTE: The Beats shipper automatically sets the ‘type` field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the <<plugins-inputs-beats-type,`type`>> config option in Logstash, it is ignored.

IMPORTANT: If you are shipping events that span multiple lines, you need to use the configuration options available in Filebeat to handle multiline events before sending the event data to Logstash. You cannot use the <<plugins-codecs-multiline>> codec to handle multiline events.

Defined Under Namespace

Classes: CodecCallbackListener, DecodedEventTransform, EventTransformCommon, MessageListener, RawEventTransform, TLS

Instance Method Summary collapse

Instance Method Details

#client_authentification?Boolean

Returns:

  • (Boolean)


205
206
207
# File 'lib/logstash/inputs/beats.rb', line 205

def client_authentification?
  @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0
end

#convert_protocolsObject



217
218
219
# File 'lib/logstash/inputs/beats.rb', line 217

def convert_protocols
  TLS.get_supported(@tls_min_version..@tls_max_version).map(&:name)
end

#create_serverObject

def register



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/logstash/inputs/beats.rb', line 161

def create_server
  server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
  if @ssl

    begin
    ssl_builder = org.logstash.netty.SslSimpleBuilder.new(@ssl_certificate, @ssl_key, @ssl_key_passphrase.nil? ? nil : @ssl_key_passphrase.value)
      .setProtocols(convert_protocols)
      .setCipherSuites(normalized_ciphers)
    rescue java.lang.IllegalArgumentException => e
      raise LogStash::ConfigurationError, e
    end

    ssl_builder.setHandshakeTimeoutMilliseconds(@ssl_handshake_timeout)

    if client_authentification?
      if @ssl_verify_mode.upcase == "FORCE_PEER"
          ssl_builder.setVerifyMode(org.logstash.netty.SslSimpleBuilder::SslClientVerifyMode::FORCE_PEER)
      end
      ssl_builder.setCertificateAuthorities(@ssl_certificate_authorities)
    end

    server.enableSSL(ssl_builder)
  end
  server
end

#normalized_ciphersObject



213
214
215
# File 'lib/logstash/inputs/beats.rb', line 213

def normalized_ciphers
  @cipher_suites.map(&:upcase)
end

#registerObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/logstash/inputs/beats.rb', line 125

def register
  # For Logstash 2.4 we need to make sure that the logger is correctly set for the
  # java classes before actually loading them.
  #
  # if we don't do this we will get this error:
  # log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory)
  if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j)
    LogStash::Logger.setup_log4j(@logger)
  end

  java_import "org.logstash.beats.Server"
  java_import "org.logstash.netty.SslSimpleBuilder"
  java_import "java.io.FileInputStream"
  java_import "io.netty.handler.ssl.OpenSsl"

  if !@ssl
    @logger.warn("Beats input: SSL Certificate will not be used") unless @ssl_certificate.nil?
    @logger.warn("Beats input: SSL Key will not be used") unless @ssl_key.nil?
  elsif !ssl_configured?
    raise LogStash::ConfigurationError, "Certificate or Certificate Key not configured"
  end

  if @ssl && require_certificate_authorities? && !client_authentification?
    raise LogStash::ConfigurationError, "Using `verify_mode` set to PEER or FORCE_PEER, requires the configuration of `certificate_authorities`"
  end

  # Logstash 6.x breaking change (introduced with 4.0.0 of this gem)
  if @codec.kind_of? LogStash::Codecs::Multiline
    raise LogStash::ConfigurationError, "Multiline codec with beats input is not supported. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html"
  end

  @logger.info("Beats inputs: Starting input listener", :address => "#{@host}:#{@port}")

  @server = create_server
end

#require_certificate_authorities?Boolean

Returns:

  • (Boolean)


209
210
211
# File 'lib/logstash/inputs/beats.rb', line 209

def require_certificate_authorities?
  @ssl_verify_mode == "force_peer" || @ssl_verify_mode == "peer"
end

#run(output_queue) ⇒ Object



195
196
197
198
199
# File 'lib/logstash/inputs/beats.rb', line 195

def run(output_queue)
  message_listener = MessageListener.new(output_queue, self)
  @server.setMessageListener(message_listener)
  @server.listen
end

#ssl_configured?Boolean

Returns:

  • (Boolean)


187
188
189
# File 'lib/logstash/inputs/beats.rb', line 187

def ssl_configured?
  !(@ssl_certificate.nil? || @ssl_key.nil?)
end

#stopObject

def run



201
202
203
# File 'lib/logstash/inputs/beats.rb', line 201

def stop
  @server.stop unless @server.nil?
end

#target_codec_on_field?Boolean

Returns:

  • (Boolean)


191
192
193
# File 'lib/logstash/inputs/beats.rb', line 191

def target_codec_on_field?
  !@target_codec_on_field.empty?
end