Class: LogStash::Outputs::ElasticSearchJava
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::ElasticSearchJava
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/elasticsearch_java.rb
Overview
This output lets you store logs in Elasticsearch using the native ‘node’ and ‘transport’ protocols. It is highly recommended to use the regular ‘logstash-output-elasticsearch’ output which uses HTTP instead. This output is, in-fact, sometimes slower, and never faster than that one. Additionally, upgrading your Elasticsearch cluster may require you to simultaneously update this plugin for any protocol level changes. The HTTP client may be easier to work with due to wider familiarity with HTTP.
*VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch 1.0.0 or later.
If you want to set other Elasticsearch options that are not exposed directly as configuration options, there are two methods:
-
Create an ‘elasticsearch.yml` file in the $PWD of the Logstash process
-
Pass in es.* java properties (‘java -Des.node.foo=` or `ruby -J-Des.node.foo=`)
With the default ‘protocol` setting (“node”), this plugin will join your Elasticsearch cluster as a client node, so it will show up in Elasticsearch’s cluster status.
You can learn more about Elasticsearch at <www.elastic.co/products/elasticsearch>
Operational Notes
If using the default ‘protocol` setting (“node”), your firewalls might need to permit port 9300 in both directions (from Logstash to Elasticsearch, and Elasticsearch to Logstash)
Retry Policy
By default all bulk requests to ES are synchronous. Not all events in the bulk requests always make it successfully. For example, there could be events which are not formatted correctly for the index they are targeting (type mismatch in mapping). So that we minimize loss of events, we have a specific retry policy in place. We retry all events which fail to be reached by Elasticsearch for network related issues. We retry specific events which exhibit errors under a separate policy described below. Events of this nature are ones which experience ES error codes described as retryable errors.
*Retryable Errors:*
-
429, Too Many Requests (RFC6585)
-
503, The server is currently unable to handle the request due to a temporary overloading or maintenance of the server.
Here are the rules of what is retried when:
-
Block and retry all events in bulk response that experiences transient network exceptions until a successful submission is received by Elasticsearch.
-
Retry subset of sent events which resulted in ES errors of a retryable nature which can be found in RETRYABLE_CODES
-
For events which returned retryable error codes, they will be pushed onto a separate queue for retrying events. events in this queue will be retried a maximum of 5 times by default (configurable through :max_retries). The size of this queue is capped by the value set in :retry_max_items.
-
Events from the retry queue are submitted again either when the queue reaches its max size or when the max interval time is reached, which is set in :retry_max_interval.
-
Events which are not retryable or have reached their max retry count are logged to stderr.
Constant Summary collapse
- RETRYABLE_CODES =
[409, 429, 503]
- SUCCESS_CODES =
[200, 201]
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
Instance Method Summary collapse
-
#client_options ⇒ Object
def register.
- #close ⇒ Object
- #flush(actions, teardown = false) ⇒ Object
- #get_template ⇒ Object
- #receive(event) ⇒ Object
- #register ⇒ Object
-
#submit(actions) ⇒ Object
The submit method can be called from both the Stud::Buffer flush thread and from our own retry thread.
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
68 69 70 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 68 def client @client end |
Instance Method Details
#client_options ⇒ Object
def register
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 361 def client_settings = {} client_settings["cluster.name"] = @cluster if @cluster client_settings["network.host"] = @network_host if @network_host client_settings["transport.tcp.port"] = @transport_tcp_port if @transport_tcp_port client_settings["client.transport.sniff"] = @sniffing if @node_name client_settings["node.name"] = @node_name else client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" end @@plugins.each do |plugin| name = plugin.name.split('_')[-1] client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self)) end = { :protocol => @protocol, :client_settings => client_settings, :hosts => @hosts, :port => @port } # Update API setup = { :upsert => @upsert, :doc_as_upsert => @doc_as_upsert } .merge! if @action == 'update' end |
#close ⇒ Object
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 484 def close @retry_teardown_requested.make_true # First, make sure retry_timer_thread is stopped # to ensure we do not signal a retry based on # the retry interval. Thread.kill(@retry_timer_thread) @retry_timer_thread.join # Signal flushing in the case that #retry_flush is in # the process of waiting for a signal. @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } # Now, #retry_flush is ensured to not be in a state of # waiting and can be safely joined into the main thread # for further final execution of an in-process remaining call. @retry_thread.join # execute any final actions along with a proceeding retry for any # final actions that did not succeed. buffer_flush(:final => true) retry_flush end |
#flush(actions, teardown = false) ⇒ Object
474 475 476 477 478 479 480 481 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 474 def flush(actions, teardown = false) begin submit(actions) rescue => e @logger.error "Got error to send bulk of actions: #{e.}" raise e end end |
#get_template ⇒ Object
398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 398 def get_template if @template.nil? @template = ::File.('elasticsearch_java/elasticsearch-template.json', ::File.dirname(__FILE__)) if !File.exists?(@template) raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" end end template_json = IO.read(@template).gsub(/\n/,'') template = LogStash::Json.load(template_json) @logger.info("Using mapping template", :template => template) return template end |
#receive(event) ⇒ Object
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 412 def receive(event) # block until we have not maxed out our # retry queue. This is applying back-pressure # to slow down the receive-rate @retry_flush_mutex.synchronize { @retry_queue_not_full.wait(@retry_flush_mutex) while @retry_queue.size > @retry_max_items } event['@metadata']['retry_count'] = 0 # Set the 'type' value for the index. type = if @document_type event.sprintf(@document_type) elsif @index_type # deprecated event.sprintf(@index_type) else event["type"] || "logs" end params = { :_id => @document_id ? event.sprintf(@document_id) : nil, :_index => event.sprintf(@index), :_type => type, :_routing => @routing ? event.sprintf(@routing) : nil } params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != "" buffer_receive([event.sprintf(@action), params, event]) end |
#register ⇒ Object
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 296 def register @submit_mutex = Mutex.new # retry-specific variables @retry_flush_mutex = Mutex.new @retry_teardown_requested = Concurrent::AtomicBoolean.new(false) # needs flushing when interval @retry_queue_needs_flushing = ConditionVariable.new @retry_queue_not_full = ConditionVariable.new @retry_queue = Queue.new if @protocol =='node' && !@network_host raise LogStash::ConfigurationError, "network_host MUST be set if the 'node' protocol is in use! If this is set incorrectly Logstash will hang attempting to connect!" end if (@hosts.nil? || @hosts.empty?) && @protocol != "node" # node can use zen discovery @logger.info("No 'hosts' set in elasticsearch output. Defaulting to localhost") @hosts = ["localhost"] end client_class = case @protocol when "transport" LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient when "node" LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient end @client = client_class.new() if @manage_template begin @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) client.template_install(@template_name, get_template, @template_overwrite) rescue => e @logger.error("Failed to install template", :message => e., :error_class => e.class.name, ) end end @logger.info("New Elasticsearch output", :cluster => @cluster, :hosts => @host, :port => @port, :protocol => @protocol) buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) @retry_timer_thread = Thread.new do loop do sleep(@retry_max_interval) @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } end end @retry_thread = Thread.new do while @retry_teardown_requested.false? @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) } retry_flush end end end |
#submit(actions) ⇒ Object
The submit method can be called from both the Stud::Buffer flush thread and from our own retry thread.
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 448 def submit(actions) es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } @submit_mutex.lock begin bulk_response = @client.bulk(es_actions) ensure @submit_mutex.unlock end if bulk_response["errors"] actions_with_responses = actions.zip(bulk_response['statuses']) actions_to_retry = [] actions_with_responses.each do |action, resp_code| if RETRYABLE_CODES.include?(resp_code) @logger.warn "retrying failed action with response code: #{resp_code}" actions_to_retry << action elsif not SUCCESS_CODES.include?(resp_code) @logger.warn "failed action with response of #{resp_code}, dropping action: #{action}" end end retry_push(actions_to_retry) unless actions_to_retry.empty? end end |