Class: LogStash::Outputs::Kafka
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Kafka
- Defined in:
- lib/logstash/outputs/kafka.rb
Overview
Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on the broker.
Here’s a compatibility matrix that shows the Kafka client versions that are compatible with each combination of Logstash and the Kafka output plugin:
- options=“header”
-
|========================================================== |Kafka Client Version |Logstash Version |Plugin Version |Why? |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (‘event[’price’] = 10`) |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (‘event.set(‘[price]’, 10)‘) |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker |0.10.1.x |2.4.x - 5.x.x | 6.x.x | |==========================================================
NOTE: We recommended that you use matching Kafka client and broker versions. During upgrades, you should upgrade brokers before clients because brokers target backwards compatibility. For example, the 0.9 broker is compatible with both the 0.8 consumer and 0.9 consumer APIs, but not the other way around.
This output supports connecting to Kafka over:
-
SSL (requires plugin version 3.0.0 or later)
-
Kerberos SASL (requires plugin version 5.1.0 or later)
By default security is disabled but can be turned on as needed.
The only required configuration is the topic_id. The default codec is plain, so events will be persisted on the broker in plain format. Logstash will encode your messages with not only the message but also with a timestamp and hostname. If you do not want anything but your message passing through, you should make the output configuration something like:
- source,ruby
-
output
kafka { codec => plain { format => "%{message" } topic_id => "mytopic" }}
For more information see kafka.apache.org/documentation.html#theproducer
Kafka producer configuration: kafka.apache.org/documentation.html#newproducerconfigs
-
Instance Method Summary collapse
- #close ⇒ Object
- #multi_receive(events) ⇒ Object
-
#prepare(record) ⇒ Object
def register.
- #register ⇒ Object
- #retrying_send(batch) ⇒ Object
Instance Method Details
#close ⇒ Object
299 300 301 |
# File 'lib/logstash/outputs/kafka.rb', line 299 def close @producer.close end |
#multi_receive(events) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/logstash/outputs/kafka.rb', line 218 def multi_receive(events) t = Thread.current if !@thread_batch_map.include?(t) @thread_batch_map[t] = java.util.ArrayList.new(events.size) end events.each do |event| break if event == LogStash::SHUTDOWN @codec.encode(event) end batch = @thread_batch_map[t] if batch.any? (batch) batch.clear end end |
#prepare(record) ⇒ Object
def register
213 214 215 216 |
# File 'lib/logstash/outputs/kafka.rb', line 213 def prepare(record) # This output is threadsafe, so we need to keep a batch per thread. @thread_batch_map[Thread.current].add(record) end |
#register ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/logstash/outputs/kafka.rb', line 183 def register @thread_batch_map = Concurrent::Hash.new if !@retries.nil? if @retries < 0 raise ConfigurationError, "A negative retry count (#{@retries}) is not valid. Must be a value >= 0" end @logger.warn("Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do not want to lose data if Kafka is down, then you must remove the retry setting.", :retries => @retries) end @producer = create_producer @codec.on_event do |event, data| begin if .nil? record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), data) else record = org.apache.kafka.clients.producer.ProducerRecord.new(event.sprintf(@topic_id), event.sprintf(), data) end prepare(record) rescue LogStash::ShutdownSignal @logger.debug('Kafka producer got shutdown signal') rescue => e @logger.warn('kafka producer threw exception, restarting', :exception => e) end end end |
#retrying_send(batch) ⇒ Object
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/logstash/outputs/kafka.rb', line 236 def (batch) remaining = @retries while batch.any? if !remaining.nil? if remaining < 0 # TODO(sissel): Offer to DLQ? Then again, if it's a transient fault, # DLQing would make things worse (you dlq data that would be successful # after the fault is repaired) logger.info("Exhausted user-configured retry count when sending to Kafka. Dropping these events.", :max_retries => @retries, :drop_count => batch.count) break end remaining -= 1 end failures = [] futures = batch.collect do |record| begin # send() can throw an exception even before the future is created. @producer.send(record) rescue org.apache.kafka.common.errors.TimeoutException => e failures << record nil rescue org.apache.kafka.common.errors.InterruptException => e failures << record nil rescue org.apache.kafka.common.errors.SerializationException => e # TODO(sissel): Retrying will fail because the data itself has a problem serializing. # TODO(sissel): Let's add DLQ here. failures << record nil end end.compact futures.each_with_index do |future, i| begin result = future.get() rescue => e # TODO(sissel): Add metric to count failures, possibly by exception type. logger.debug? && logger.debug("KafkaProducer.send() failed: #{e}", :exception => e); failures << batch[i] end end # No failures? Cool. Let's move on. break if failures.empty? # Otherwise, retry with any failed transmissions if remaining != nil && remaining < 0 logger.info("Sending batch to Kafka failed.", :batch_size => batch.size,:failures => failures.size) else delay = @retry_backoff_ms / 1000.0 logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size, :failures => failures.size, :sleep => delay) batch = failures sleep(delay) end end end |