Class: LogStash::Outputs::ElasticSearchJava
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::ElasticSearchJava
- Includes:
- LogStash::Outputs::ElasticSearch::Common, LogStash::Outputs::ElasticSearch::CommonConfigs
- Defined in:
- lib/logstash/outputs/elasticsearch_java.rb
Overview
This output lets you store logs in Elasticsearch using the native ‘node’ and ‘transport’ protocols. It is highly recommended to use the regular ‘logstash-output-elasticsearch’ output which uses HTTP instead. This output is, in-fact, sometimes slower, and never faster than that one. Additionally, upgrading your Elasticsearch cluster may require you to simultaneously update this plugin for any protocol level changes. The HTTP client may be easier to work with due to wider familiarity with HTTP.
*VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch 1.0.0 or later.
If you want to set other Elasticsearch options that are not exposed directly as configuration options, there are two methods:
-
Create an ‘elasticsearch.yml` file in the $PWD of the Logstash process
-
Pass in es.* java properties (‘java -Des.node.foo=` or `ruby -J-Des.node.foo=`)
With the default ‘protocol` setting (“node”), this plugin will join your Elasticsearch cluster as a client node, so it will show up in Elasticsearch’s cluster status.
You can learn more about Elasticsearch at <www.elastic.co/products/elasticsearch>
Operational Notes
If using the default ‘protocol` setting (“node”), your firewalls might need to permit port 9300 in both directions (from Logstash to Elasticsearch, and Elasticsearch to Logstash)
Retry Policy
By default all bulk requests to ES are synchronous. Not all events in the bulk requests always make it successfully. For example, there could be events which are not formatted correctly for the index they are targeting (type mismatch in mapping). So that we minimize loss of events, we have a specific retry policy in place. We retry all events which fail to be reached by Elasticsearch for network related issues. We retry specific events which exhibit errors under a separate policy described below. Events of this nature are ones which experience ES error codes described as retryable errors.
*Retryable Errors:*
-
429, Too Many Requests (RFC6585)
-
503, The server is currently unable to handle the request due to a temporary overloading or maintenance of the server.
Here are the rules of what is retried when:
-
Block and retry all events in bulk response that experiences transient network exceptions until a successful submission is received by Elasticsearch.
-
Retry subset of sent events which resulted in ES errors of a retryable nature which can be found in RETRYABLE_CODES
-
For events which returned retryable error codes, they will be pushed onto a separate queue for retrying events. events in this queue will be retried a maximum of 5 times by default (configurable through :max_retries). The size of this queue is capped by the value set in :retry_max_items.
-
Events from the retry queue are submitted again either when the queue reaches its max size or when the max interval time is reached, which is set in :retry_max_interval.
-
Events which are not retryable or have reached their max retry count are logged to stderr.
Constant Summary collapse
- RETRYABLE_CODES =
[409, 429, 503]
- SUCCESS_CODES =
[200, 201]
- @@plugins =
Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch_java_/ }
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
Instance Method Summary collapse
- #build_client ⇒ Object
- #client_class ⇒ Object
- #client_options ⇒ Object
- #close ⇒ Object
- #get_plugin_options ⇒ Object
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
68 69 70 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 68 def client @client end |
Instance Method Details
#build_client ⇒ Object
168 169 170 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 168 def build_client @client = client_class.new() end |
#client_class ⇒ Object
184 185 186 187 188 189 190 191 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 184 def client_class case @protocol when "transport" LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient when "node" LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient end end |
#client_options ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 139 def client_settings = {} client_settings["cluster.name"] = @cluster if @cluster client_settings["network.host"] = @network_host if @network_host client_settings["transport.tcp.port"] = @transport_tcp_port if @transport_tcp_port client_settings["client.transport.sniff"] = @sniffing if @node_name client_settings["node.name"] = @node_name else client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" end = { :protocol => @protocol, :client_settings => client_settings, :hosts => @hosts } # Update API setup = { :upsert => @upsert, :doc_as_upsert => @doc_as_upsert } .merge! if @action == 'update' end |
#close ⇒ Object
172 173 174 175 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 172 def close @stopping.make_true @buffer.stop end |
#get_plugin_options ⇒ Object
177 178 179 180 181 182 |
# File 'lib/logstash/outputs/elasticsearch_java.rb', line 177 def @@plugins.each do |plugin| name = plugin.name.split('-')[-1] client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self)) end end |