Class: LogStash::Inputs::Beats

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/beats.rb,
lib/logstash/inputs/beats/tls.rb,
lib/logstash/inputs/beats/message_listener.rb,
lib/logstash/inputs/beats/raw_event_transform.rb,
lib/logstash/inputs/beats/event_transform_common.rb,
lib/logstash/inputs/beats/codec_callback_listener.rb,
lib/logstash/inputs/beats/decoded_event_transform.rb

Overview

This input plugin enables Logstash to receive events from the www.elastic.co/products/beats[Elastic Beats] framework.

The following example shows how to configure Logstash to listen on port 5044 for incoming Beats connections and to index into Elasticsearch:

source,ruby

input {

beats {
  port => 5044
}

}

output

elasticsearch {
  hosts => "localhost:9200"
  manage_template => false
  index => "%{[@metadata][beat]-%+YYYY+YYYY.MM+YYYY.MM.dd"
  document_type => "%[@metadata][type]"
}

}


NOTE: The Beats shipper automatically sets the ‘type` field on the event. You cannot override this setting in the Logstash config. If you specify a setting for the <<plugins-inputs-beats-type,`type`>> config option in Logstash, it is ignored.

IMPORTANT: If you are shipping events that span multiple lines, you need to use the configuration options available in Filebeat to handle multiline events before sending the event data to Logstash. You cannot use the <<plugins-codecs-multiline>> codec to handle multiline events.

Defined Under Namespace

Classes: CodecCallbackListener, DecodedEventTransform, EventTransformCommon, MessageListener, RawEventTransform, TLS

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#field_hostipObject (readonly)

Returns the value of attribute field_hostip.



128
129
130
# File 'lib/logstash/inputs/beats.rb', line 128

def field_hostip
  @field_hostip
end

#field_hostnameObject (readonly)

Returns the value of attribute field_hostname.



128
129
130
# File 'lib/logstash/inputs/beats.rb', line 128

def field_hostname
  @field_hostname
end

Instance Method Details

#client_authentication_metadata?Boolean

Returns:

  • (Boolean)


216
217
218
# File 'lib/logstash/inputs/beats.rb', line 216

def client_authentication_metadata?
  @ssl_peer_metadata && ssl_configured? && client_authentification? 
end

#client_authentication_required?Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/logstash/inputs/beats.rb', line 220

def client_authentication_required?
  @ssl_verify_mode == "force_peer" 
end

#client_authentification?Boolean

Returns:

  • (Boolean)


212
213
214
# File 'lib/logstash/inputs/beats.rb', line 212

def client_authentification?
  @ssl_certificate_authorities && @ssl_certificate_authorities.size > 0
end

#create_serverObject

def register



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/logstash/inputs/beats.rb', line 177

def create_server
  server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
  if @ssl
    ssl_context_builder = new_ssl_context_builder
    if client_authentification?
      if @ssl_verify_mode == "force_peer"
        ssl_context_builder.setVerifyMode(org.logstash.netty.SslContextBuilder::SslClientVerifyMode::FORCE_PEER)
      elsif @ssl_verify_mode == "peer"
        ssl_context_builder.setVerifyMode(org.logstash.netty.SslContextBuilder::SslClientVerifyMode::VERIFY_PEER)
      end
      ssl_context_builder.setCertificateAuthorities(@ssl_certificate_authorities)
    end
    server.setSslHandlerProvider(new_ssl_handshake_provider(ssl_context_builder))
  end
  server
end

#registerObject



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/logstash/inputs/beats.rb', line 130

def register
  # For Logstash 2.4 we need to make sure that the logger is correctly set for the
  # java classes before actually loading them.
  #
  # if we don't do this we will get this error:
  # log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory)
  if defined?(LogStash::Logger) && LogStash::Logger.respond_to?(:setup_log4j)
    LogStash::Logger.setup_log4j(@logger)
  end

  if @ssl
    if @ssl_key.nil? || @ssl_key.empty?
      configuration_error "ssl_key => is a required setting when ssl => true is configured"
    end
    if @ssl_certificate.nil? || @ssl_certificate.empty?
      configuration_error "ssl_certificate => is a required setting when ssl => true is configured"
    end

    if require_certificate_authorities? && !client_authentification?
      configuration_error "ssl_certificate_authorities => is a required setting when ssl_verify_mode => '#{@ssl_verify_mode}' is configured"
    end

    if client_authentication_metadata? && !require_certificate_authorities?
      configuration_error "Configuring ssl_peer_metadata => true requires ssl_verify_mode => to be configured with 'peer' or 'force_peer'"
    end
  else
    @logger.warn("configured ssl_certificate => #{@ssl_certificate.inspect} will not be used") if @ssl_certificate
    @logger.warn("configured ssl_key => #{@ssl_key.inspect} will not be used") if @ssl_key
  end

  # Logstash 6.x breaking change (introduced with 4.0.0 of this gem)
  if @codec.kind_of? LogStash::Codecs::Multiline
    configuration_error "Multiline codec with beats input is not supported. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html"
  end

  # define ecs name mapping
  @field_hostname = ecs_select[disabled: "host", v1: "[@metadata][input][beats][host][name]"]
  @field_hostip   = ecs_select[disabled: "[@metadata][ip_address]", v1: "[@metadata][input][beats][host][ip]"]
  @field_tls_protocol_version   = ecs_select[disabled: "[@metadata][tls_peer][protocol]", v1: "[@metadata][input][beats][tls][version_protocol]"]
  @field_tls_peer_subject   = ecs_select[disabled: "[@metadata][tls_peer][subject]", v1: "[@metadata][input][beats][tls][client][subject]"]
  @field_tls_cipher   = ecs_select[disabled: "[@metadata][tls_peer][cipher_suite]", v1: "[@metadata][input][beats][tls][cipher]"]

  @logger.info("Starting input listener", :address => "#{@host}:#{@port}")

  @server = create_server
end

#require_certificate_authorities?Boolean

Returns:

  • (Boolean)


224
225
226
# File 'lib/logstash/inputs/beats.rb', line 224

def require_certificate_authorities?
  @ssl_verify_mode == "force_peer" || @ssl_verify_mode == "peer"
end

#run(output_queue) ⇒ Object



194
195
196
197
198
# File 'lib/logstash/inputs/beats.rb', line 194

def run(output_queue)
  message_listener = MessageListener.new(output_queue, self)
  @server.setMessageListener(message_listener)
  @server.listen
end

#ssl_configured?Boolean

Returns:

  • (Boolean)


204
205
206
# File 'lib/logstash/inputs/beats.rb', line 204

def ssl_configured?
  !(@ssl_certificate.nil? || @ssl_key.nil?)
end

#stopObject

def run



200
201
202
# File 'lib/logstash/inputs/beats.rb', line 200

def stop
  @server.stop unless @server.nil?
end

#target_codec_on_field?Boolean

Returns:

  • (Boolean)


208
209
210
# File 'lib/logstash/inputs/beats.rb', line 208

def target_codec_on_field?
  !@target_codec_on_field.empty?
end