Class: LogStash::Inputs::Beats
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Beats
- Defined in:
- lib/logstash/inputs/beats.rb
Defined Under Namespace
Classes: InsertingToQueueTakeTooLong
Constant Summary collapse
- RECONNECT_BACKOFF_SLEEP =
TODO(sissel): Add CA to authenticate clients with.
0.5
Instance Method Summary collapse
-
#handle_new_connection(connection) ⇒ Object
This Method is called inside a new thread.
- #need_identity_map? ⇒ Boolean
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#ssl_configured? ⇒ Boolean
def register.
-
#start_buffer_broker ⇒ Object
The default Logstash Sizequeue doesn’t support timeouts.
-
#stop ⇒ Object
def run.
- #target_codec_on_field? ⇒ Boolean
Instance Method Details
#handle_new_connection(connection) ⇒ Object
This Method is called inside a new thread
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/logstash/inputs/beats.rb', line 201 def handle_new_connection(connection) @logger.debug? && @logger.debug("Beats inputs: accepting a new connection", :peer => connection.peer) LogStash::Util.set_thread_name("[beats-input]>connection-#{connection.peer}") connection_handler = LogStash::Inputs::BeatsSupport::ConnectionHandler.new(connection, self, @buffered_queue) @connections_list[connection] = connection_handler # All the errors handling is done here @circuit_breaker.execute { connection_handler.accept } rescue Lumberjack::Beats::Connection::ConnectionClosed => e # Only show theses errors if we run in debug mode, # This logger can get noisy if we use a proxy that check if the host is alive. @logger.debug? && @logger.debug("Beats Input: Remote connection closed", :peer => connection.peer, :exception => e) rescue LogStash::Inputs::BeatsSupport::CircuitBreaker::OpenBreaker, LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker => e @logger.warn("Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception => e.class) rescue Exception => e # If we have a malformed packet we should handle that so the input doesn't crash completely. @logger.error("Beats input: unhandled exception", :exception => e, :backtrace => e.backtrace) ensure transformer = LogStash::Inputs::BeatsSupport::EventTransformCommon.new(self) connection_handler.flush do |event| # handle the basic event enrichment with tags # since at that time we lose all the context transformer.transform(event) event.tag("beats_input_flushed_by_end_of_connection") @output_queue << event end @connections_list.delete(connection) @logger.debug? && @logger.debug("Beats input: clearing the connection from the known clients", :peer => connection.peer) end |
#need_identity_map? ⇒ Boolean
266 267 268 |
# File 'lib/logstash/inputs/beats.rb', line 266 def need_identity_map? @codec.kind_of?(LogStash::Codecs::Multiline) end |
#register ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/logstash/inputs/beats.rb', line 103 def register if !@ssl @logger.warn("Beats input: SSL Certificate will not be used") unless @ssl_certificate.nil? @logger.warn("Beats input: SSL Key will not be used") unless @ssl_key.nil? elsif !ssl_configured? raise LogStash::ConfigurationError, "Certificate or Certificate Key not configured" end @logger.info("Beats inputs: Starting input listener", :address => "#{@host}:#{@port}") @lumberjack = Lumberjack::Beats::Server.new(:address => @host, :port => @port, :ssl => @ssl, :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key, :ssl_key_passphrase => @ssl_key_passphrase ? @ssl_key_passphrase.value : nil, :ssl_certificate_authorities => @ssl_certificate_authorities, :ssl_verify_mode => @ssl_verify_mode) # in 1.5 the main SizeQueue doesnt have the concept of timeout # We are using a small plugin buffer to move events to the internal queue @buffered_queue = LogStash::Inputs::BeatsSupport::SynchronousQueueWithOffer.new(@congestion_threshold) @circuit_breaker = LogStash::Inputs::BeatsSupport::CircuitBreaker.new("Beats input", :exceptions => [InsertingToQueueTakeTooLong]) # wrap the configured codec to support identity stream # from the producers if running with the multiline codec. # # If they dont need an identity map, codec are stateless and can be reused # accross multiples connections. if need_identity_map? @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end # Keep a list of active connections so we can flush their codec on shutdown # Use threadsafe gem, since we have a strict dependency on concurrent-ruby 0.9.2 # in the core @connections_list = ThreadSafe::Hash.new end |
#run(output_queue) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/logstash/inputs/beats.rb', line 154 def run(output_queue) @output_queue = output_queue start_buffer_broker while !stop? do # Wrapping the accept call into a CircuitBreaker if @circuit_breaker.closed? connection = @lumberjack.accept # call that creates a new connection # if the connection is nil the connection was closed upstream, # so we will try in another iteration to recover or stop. next if connection.nil? Thread.new do handle_new_connection(connection) end else @logger.warn("Beats input: the pipeline is blocked, temporary refusing new connection.", :reconnect_backoff_sleep => RECONNECT_BACKOFF_SLEEP) sleep(RECONNECT_BACKOFF_SLEEP) end end end |
#ssl_configured? ⇒ Boolean
def register
146 147 148 |
# File 'lib/logstash/inputs/beats.rb', line 146 def ssl_configured? !(@ssl_certificate.nil? || @ssl_key.nil?) end |
#start_buffer_broker ⇒ Object
The default Logstash Sizequeue doesn’t support timeouts. This is problematic because this will make the client timeouts and reconnect creating multiples threads and OOMing the jvm.
We are using a proxy queue supporting blocking with a timeout and this thread take the element from one queue into another one.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/logstash/inputs/beats.rb', line 248 def start_buffer_broker Thread.new do LogStash::Util.set_thread_name("[beats-input]-buffered-queue-broker") begin while !stop? @output_queue << @buffered_queue.take end rescue java.lang.InterruptedException => e # If we are shutting down without waiting the queue to unblock # we will get an `java.lang.InterruptionException` in that context we will not log it. @logger.error("Beats input: bufferered queue exception", :exception => e) unless stop? rescue => e @logger.error("Beats input: unexpected exception", :exception => e) end end end |
#stop ⇒ Object
def run
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/logstash/inputs/beats.rb', line 177 def stop @logger.debug("Beats input: stopping the plugin") @lumberjack.close rescue nil # we may have some stuff in the codec buffer transformer = LogStash::Inputs::BeatsSupport::EventTransformCommon.new(self) # Go through all the active connection and flush their # codec content, some context data could be lost in this case # but at least the events main data would be persisted. @connections_list.each do |_, connection_handler| connection_handler.flush do |event| # We might loose some context of the transformer.transform(event) event.tag("beats_input_flushed_by_logtash_shutdown") @output_queue << event end end @logger.debug("Beats input: stopped") end |
#target_codec_on_field? ⇒ Boolean
150 151 152 |
# File 'lib/logstash/inputs/beats.rb', line 150 def target_codec_on_field? !@target_codec_on_field.empty? end |