Class: Fluent::Plugin::ElasticsearchStatsInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_elasticsearch_stats.rb

Constant Summary collapse

NAME =
'elasticsearch_stats'
DEFAULT_TAG =
NAME
DEFAULT_URLS =
['http://localhost:9200'].freeze
DEFAULT_TIMEOUT =
10
DEFAULT_USER_AGENT =
NAME
DEFAULT_INTERVAL =
300
DEFAULT_TIMESTAMP_FORMAT =
:iso
DEFAULT_CLUSTER_HEALTH =
true
DEFAULT_CLUSTER_HEALTH_LEVEL =
:cluster
DEFAULT_CLUSTER_HEALTH_LOCAL =
false
DEFAULT_CLUSTER_STATS =
true
DEFAULT_NODES_STATS =
true
DEFAULT_NODES_STATS_LEVEL =
:node
DEFAULT_NODES_STATS_METRICS =
nil
DEFAULT_INDICES_STATS =
true
DEFAULT_INDICES_STATS_LEVEL =
:indices
DEFAULT_INDICES =
[:_all]
DEFAULT_INDICES_STATS_METRICS =
nil
DEFAULT_SHARDS_STATS =
true
DEFAULT_DANGLING =
false
DEFAULT_INDEX_BASE_PATTERN =

‘/(.*)/’

nil
DEFAULT_INDEX_BASE_REPLACEMENT =
'\1'
DEFAULT_EVENT_NAME_SEPARATOR =
'/'
DEFAULT_SKIP_SYSTEM_INDICES =
true
DEFAULT_AGGREGATED_INDEX_METRICS_ONLY =
false
DEFAULT_AGGREGATED_INDEX_METRICS =
['sum']
ALLOWED_CLUSTER_HEALTH_LEVELS =
Fluent::Plugin::ElasticsearchStats::Client::ALLOWED_CLUSTER_HEALTH_LEVELS
ALLOWED_NODES_STATS_LEVELS =
Fluent::Plugin::ElasticsearchStats::Client::ALLOWED_NODES_STATS_LEVELS
ALLOWED_INDICES_STATS_LEVELS =
Fluent::Plugin::ElasticsearchStats::Client::ALLOWED_INDICES_LEVELS
ALLOWED_AGGREGATED_INDEX_METRICS =
Fluent::Plugin::ElasticsearchStats::Metric::ALLOWED_AGGREGATED_INDEX_METRICS

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

desc ‘skip system indices’ config_param :skip_system_indices, :bool, default: DEFAULT_SKIP_SYSTEM_INDICES

Raises:

  • (Fluent::ConfigError)


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 136

def configure(conf)
  super

  raise Fluent::ConfigError, 'tag should not be empty' if tag.empty?
  raise Fluent::ConfigError, 'urls should not be empty' if urls.empty?

  @mutex_emit = Mutex.new

  wrong_fields = aggregated_index_metrics.select { |item| ! ALLOWED_AGGREGATED_INDEX_METRICS.include?(item) }
  raise Fluent::ConfigError, "aggregated_index_metrics contains unexpected values: #{wrong_fields}" if wrong_fields.size > 0

  ElasticsearchStats::Metadata. = 
  ElasticsearchStats::Metric.metric_prefix = metric_prefix
  ElasticsearchStats::Metric.timestamp_format = timestamp_format
  ElasticsearchStats::Metric.index_base_pattern = index_base_pattern
  ElasticsearchStats::Metric.index_base_replacement = index_base_replacement
  ElasticsearchStats::Metric.aggregated_index_metrics_only = aggregated_index_metrics_only
  ElasticsearchStats::Metric.aggregated_index_metrics = aggregated_index_metrics
  ElasticsearchStats::Metric.name_separator = event_name_separator

  configure_elasticsearchs
end

#configure_elasticsearchsObject



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 159

def configure_elasticsearchs
  @elasticsearchs = []
  urls.each do |url|
    client = ElasticsearchStats::Client.new(
      url: url, timeout: timeout, username: username, password: password,
      user_agent: user_agent, ca_file: ca_file, verify_ssl: verify_ssl,
      log: log
    )
    @elasticsearchs << ElasticsearchStats::Collector.new(
      client: client,
      stats_config: self,
      log: log
    )
  end
end

#emit_events(events) ⇒ Object



198
199
200
201
202
203
204
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 198

def emit_events(events)
  return if !events || events.empty?

  @mutex_emit.synchronize do
    router.emit_stream(tag, events)
  end
end

#execute_collectObject



183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 183

def execute_collect
  threads = []
  @elasticsearchs.each do |elasticsearch|
    threads << Thread.new do
      metrics = elasticsearch.collect_stats_metrics
      events = MultiEventStream.new(
        [Fluent::EventTime.now] * metrics.size,
        metrics
      )
      emit_events(events)
    end
  end
  threads.each(&:join)
end

#startObject



175
176
177
178
179
180
181
# File 'lib/fluent/plugin/in_elasticsearch_stats.rb', line 175

def start
  super

  timer_execute(:execute_collect_first, 1, repeat: false, &method(:execute_collect)) if interval > 60

  timer_execute(:execute_collect, interval, repeat: true, &method(:execute_collect))
end