Class: LogStash::Inputs::Beats
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Beats
- Defined in:
- lib/logstash/inputs/beats.rb,
lib/logstash/inputs/beats/tls.rb,
lib/logstash/inputs/beats/message_listener.rb,
lib/logstash/inputs/beats/raw_event_transform.rb,
lib/logstash/inputs/beats/event_transform_common.rb,
lib/logstash/inputs/beats/codec_callback_listener.rb,
lib/logstash/inputs/beats/decoded_event_transform.rb
Overview
This input plugin enables Logstash to receive events from the www.elastic.co/products/beats[Elastic Beats] framework.
The following example shows how to configure Logstash to listen on port 5044 for incoming Beats connections and to index into Elasticsearch:
- source,ruby
input {
beats { port => 5044 }}
output
elasticsearch { hosts => "localhost:9200" manage_template => false index => "%{[@metadata][beat]-%+YYYY+YYYY.MM+YYYY.MM.dd" document_type => "%[@metadata][type]" }}
NOTE: The Beats shipper automatically sets the ‘type` field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the <<plugins-inputs-beats-type,`type`>> config option in Logstash, it is ignored.
IMPORTANT: If you are shipping events that span multiple lines, you need to use the configuration options available in Filebeat to handle multiline events before sending the event data to Logstash. You cannot use the <<plugins-codecs-multiline>> codec to handle multiline events. Doing so may result in the mixing of streams and corrupted event data.
Defined Under Namespace
Classes: CodecCallbackListener, DecodedEventTransform, EventTransformCommon, MessageListener, RawEventTransform, TLS
Instance Method Summary collapse
- #client_authentification? ⇒ Boolean
- #convert_protocols ⇒ Object
-
#create_server ⇒ Object
def register.
- #need_identity_map? ⇒ Boolean
- #normalized_ciphers ⇒ Object
- #register ⇒ Object
- #require_certificate_authorities? ⇒ Boolean
- #run(output_queue) ⇒ Object
- #ssl_configured? ⇒ Boolean
-
#stop ⇒ Object
def run.
- #target_codec_on_field? ⇒ Boolean
Instance Method Details
#client_authentification? ⇒ Boolean
217 218 219 |
# File 'lib/logstash/inputs/beats.rb', line 217 def client_authentification? @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0 end |
#convert_protocols ⇒ Object
229 230 231 |
# File 'lib/logstash/inputs/beats.rb', line 229 def convert_protocols TLS.get_supported(@tls_min_version..@tls_max_version).map(&:name) end |
#create_server ⇒ Object
def register
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/logstash/inputs/beats.rb', line 168 def create_server server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout) if @ssl begin ssl_builder = org.logstash.netty.SslSimpleBuilder.new(@ssl_certificate, @ssl_key, @ssl_key_passphrase.nil? ? nil : @ssl_key_passphrase.value) .setProtocols(convert_protocols) .setCipherSuites(normalized_ciphers) rescue java.lang.IllegalArgumentException => e raise LogStash::ConfigurationError, e end ssl_builder.setHandshakeTimeoutMilliseconds(@ssl_handshake_timeout) if client_authentification? if @ssl_verify_mode.upcase == "FORCE_PEER" ssl_builder.setVerifyMode(org.logstash.netty.SslSimpleBuilder::SslClientVerifyMode::FORCE_PEER) end ssl_builder.setCertificateAuthorities(@ssl_certificate_authorities) end server.enableSSL(ssl_builder) end server end |
#need_identity_map? ⇒ Boolean
213 214 215 |
# File 'lib/logstash/inputs/beats.rb', line 213 def need_identity_map? @codec.kind_of?(LogStash::Codecs::Multiline) end |
#normalized_ciphers ⇒ Object
225 226 227 |
# File 'lib/logstash/inputs/beats.rb', line 225 def normalized_ciphers @cipher_suites.map(&:upcase) end |
#register ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/logstash/inputs/beats.rb', line 124 def register # For Logstash 2.4 we need to make sure that the logger is correctly set for the # java classes before actually loading them. # # if we don't do this we will get this error: # log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory) if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j) LogStash::Logger.setup_log4j(@logger) end java_import "org.logstash.beats.Server" java_import "org.logstash.netty.SslSimpleBuilder" java_import "java.io.FileInputStream" java_import "io.netty.handler.ssl.OpenSsl" if !@ssl @logger.warn("Beats input: SSL Certificate will not be used") unless @ssl_certificate.nil? @logger.warn("Beats input: SSL Key will not be used") unless @ssl_key.nil? elsif !ssl_configured? raise LogStash::ConfigurationError, "Certificate or Certificate Key not configured" end if @ssl && && !client_authentification? raise LogStash::ConfigurationError, "Using `verify_mode` set to PEER or FORCE_PEER, requires the configuration of `certificate_authorities`" end if @codec.kind_of? LogStash::Codecs::Multiline @logger.warn("WARNING! - Multiline codec with beats input has been deprecated. Support for this configuration will be removed in a future version. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html") end @logger.info("Beats inputs: Starting input listener", :address => "#{@host}:#{@port}") # wrap the configured codec to support identity stream # from the producers if running with the multiline codec. # # If they dont need an identity map, codec are stateless and can be reused # accross multiples connections. if need_identity_map? @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end @server = create_server end |
#require_certificate_authorities? ⇒ Boolean
221 222 223 |
# File 'lib/logstash/inputs/beats.rb', line 221 def @ssl_verify_mode == "force_peer" || @ssl_verify_mode == "peer" end |
#run(output_queue) ⇒ Object
203 204 205 206 207 |
# File 'lib/logstash/inputs/beats.rb', line 203 def run(output_queue) = MessageListener.new(output_queue, self) @server.setMessageListener() @server.listen end |
#ssl_configured? ⇒ Boolean
195 196 197 |
# File 'lib/logstash/inputs/beats.rb', line 195 def ssl_configured? !(@ssl_certificate.nil? || @ssl_key.nil?) end |
#stop ⇒ Object
def run
209 210 211 |
# File 'lib/logstash/inputs/beats.rb', line 209 def stop @server.stop unless @server.nil? end |
#target_codec_on_field? ⇒ Boolean
199 200 201 |
# File 'lib/logstash/inputs/beats.rb', line 199 def target_codec_on_field? !@target_codec_on_field.empty? end |