Class: Fluent::Plugin::ElasticsearchStatsInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::ElasticsearchStatsInput
- Defined in:
- lib/fluent/plugin/in_elasticsearch_stats.rb
Constant Summary collapse
- NAME =
'elasticsearch_stats'- DEFAULT_TAG =
NAME- DEFAULT_URLS =
['http://localhost:9200'].freeze
- DEFAULT_TIMEOUT =
10- DEFAULT_USER_AGENT =
NAME- DEFAULT_INTERVAL =
300- DEFAULT_TIMESTAMP_FORMAT =
:iso- DEFAULT_CLUSTER_HEALTH =
true- DEFAULT_CLUSTER_HEALTH_LEVEL =
:cluster- DEFAULT_CLUSTER_HEALTH_LOCAL =
false- DEFAULT_CLUSTER_STATS =
true- DEFAULT_NODES_STATS =
true- DEFAULT_NODES_STATS_LEVEL =
:node- DEFAULT_NODES_STATS_METRICS =
nil- DEFAULT_INDICES_STATS =
true- DEFAULT_INDICES_STATS_LEVEL =
:indices- DEFAULT_INDICES =
[:_all]
- DEFAULT_INDICES_STATS_METRICS =
nil- DEFAULT_SHARDS_STATS =
true- DEFAULT_DANGLING =
false- DEFAULT_INDEX_BASE_PATTERN =
‘/(.*)/’
nil- DEFAULT_INDEX_BASE_REPLACEMENT =
'\1'- DEFAULT_EVENT_NAME_SEPARATOR =
'/'- DEFAULT_SKIP_SYSTEM_INDICES =
true- DEFAULT_AGGREGATED_INDEX_METRICS_ONLY =
false- DEFAULT_AGGREGATED_INDEX_METRICS =
['sum']
- ALLOWED_CLUSTER_HEALTH_LEVELS =
Fluent::Plugin::ElasticsearchStats::Client::ALLOWED_CLUSTER_HEALTH_LEVELS
- ALLOWED_NODES_STATS_LEVELS =
Fluent::Plugin::ElasticsearchStats::Client::ALLOWED_NODES_STATS_LEVELS
- ALLOWED_INDICES_STATS_LEVELS =
Fluent::Plugin::ElasticsearchStats::Client::ALLOWED_INDICES_LEVELS
- ALLOWED_AGGREGATED_INDEX_METRICS =
Fluent::Plugin::ElasticsearchStats::Metric::ALLOWED_AGGREGATED_INDEX_METRICS
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
desc ‘skip system indices’ config_param :skip_system_indices, :bool, default: DEFAULT_SKIP_SYSTEM_INDICES.
- #configure_elasticsearchs ⇒ Object
- #emit_events(events) ⇒ Object
- #execute_collect ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
desc ‘skip system indices’ config_param :skip_system_indices, :bool, default: DEFAULT_SKIP_SYSTEM_INDICES
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 136 def configure(conf) super raise Fluent::ConfigError, 'tag should not be empty' if tag.empty? raise Fluent::ConfigError, 'urls should not be empty' if urls.empty? @mutex_emit = Mutex.new wrong_fields = aggregated_index_metrics.select { |item| ! ALLOWED_AGGREGATED_INDEX_METRICS.include?(item) } raise Fluent::ConfigError, "aggregated_index_metrics contains unexpected values: #{wrong_fields}" if wrong_fields.size > 0 ElasticsearchStats::Metadata. = ElasticsearchStats::Metric.metric_prefix = metric_prefix ElasticsearchStats::Metric. = ElasticsearchStats::Metric.index_base_pattern = index_base_pattern ElasticsearchStats::Metric.index_base_replacement = index_base_replacement ElasticsearchStats::Metric.aggregated_index_metrics_only = aggregated_index_metrics_only ElasticsearchStats::Metric.aggregated_index_metrics = aggregated_index_metrics ElasticsearchStats::Metric.name_separator = event_name_separator configure_elasticsearchs end |
#configure_elasticsearchs ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 159 def configure_elasticsearchs @elasticsearchs = [] urls.each do |url| client = ElasticsearchStats::Client.new( url: url, timeout: timeout, username: username, password: password, user_agent: user_agent, ca_file: ca_file, verify_ssl: verify_ssl, log: log ) @elasticsearchs << ElasticsearchStats::Collector.new( client: client, stats_config: self, log: log ) end end |
#emit_events(events) ⇒ Object
198 199 200 201 202 203 204 |
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 198 def emit_events(events) return if !events || events.empty? @mutex_emit.synchronize do router.emit_stream(tag, events) end end |
#execute_collect ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 183 def execute_collect threads = [] @elasticsearchs.each do |elasticsearch| threads << Thread.new do metrics = elasticsearch.collect_stats_metrics events = MultiEventStream.new( [Fluent::EventTime.now] * metrics.size, metrics ) emit_events(events) end end threads.each(&:join) end |
#start ⇒ Object
175 176 177 178 179 180 181 |
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 175 def start super timer_execute(:execute_collect_first, 1, repeat: false, &method(:execute_collect)) if interval > 60 timer_execute(:execute_collect, interval, repeat: true, &method(:execute_collect)) end |