Class: LogStash::Instrument::Collector
- Inherits:
-
Object
- Object
- LogStash::Instrument::Collector
- Includes:
- Util::Loggable, Observable
- Defined in:
- lib/logstash/instrument/collector.rb
Overview
The Collector is the single point of reference for all the metrics collection inside logstash, the metrics library will make direct calls to this class.
This class is an observable responsable of periodically emitting view of the system to other components like the internal metrics pipelines.
Constant Summary collapse
- SNAPSHOT_ROTATION_TIME_SECS =
seconds
1- SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS =
seconds
10 * 60
Instance Attribute Summary collapse
-
#agent ⇒ Object
Returns the value of attribute agent.
Instance Method Summary collapse
- #clear(keypath) ⇒ Object
-
#initialize ⇒ Collector
constructor
A new instance of Collector.
-
#publish_snapshot ⇒ Object
Create a snapshot of the MetricStore and send it to to the registered observers The observer will receive the following signature in the update methode.
-
#push(namespaces_path, key, type, *metric_type_params) ⇒ Object
The metric library will call this unique interface its the job of the collector to update the store with new metric of update the metric.
-
#snapshot_metric ⇒ LogStash::Instrument::MetricStore
Snapshot the current Metric Store and return it immediately, This is useful if you want to get access to the current metric store without waiting for a periodic call.
-
#start_periodic_snapshotting ⇒ Object
Configure and start the periodic task for snapshotting the ‘MetricStore`.
- #stop ⇒ Object
-
#update(time_of_execution, result, exception) ⇒ Object
Monitor the ‘Concurrent::TimerTask` this update is triggered on every successful or not run of the task, TimerTask implement Observable and the collector acts as the observer and will keep track if something went wrong in the execution.
Methods included from Util::Loggable
included, #logger, #slow_logger
Constructor Details
#initialize ⇒ Collector
Returns a new instance of Collector.
26 27 28 29 30 |
# File 'lib/logstash/instrument/collector.rb', line 26 def initialize @metric_store = MetricStore.new @agent = nil start_periodic_snapshotting end |
Instance Attribute Details
#agent ⇒ Object
Returns the value of attribute agent.
24 25 26 |
# File 'lib/logstash/instrument/collector.rb', line 24 def agent @agent end |
Instance Method Details
#clear(keypath) ⇒ Object
108 109 110 |
# File 'lib/logstash/instrument/collector.rb', line 108 def clear(keypath) @metric_store.prune(keypath) end |
#publish_snapshot ⇒ Object
Create a snapshot of the MetricStore and send it to to the registered observers The observer will receive the following signature in the update methode.
‘#update(created_at, metric_store)`
102 103 104 105 106 |
# File 'lib/logstash/instrument/collector.rb', line 102 def publish_snapshot created_at = Time.now logger.debug("Collector: Sending snapshot to observers", :created_at => created_at) if logger.debug? notify_observers(snapshot_metric) end |
#push(namespaces_path, key, type, *metric_type_params) ⇒ Object
The metric library will call this unique interface its the job of the collector to update the store with new metric of update the metric
If there is a problem with the key or the type of metric we will record an error but we wont stop processing events, theses errors are not considered fatal.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/logstash/instrument/collector.rb', line 39 def push(namespaces_path, key, type, *metric_type_params) begin metric = @metric_store.fetch_or_store(namespaces_path, key) do LogStash::Instrument::MetricType.create(type, namespaces_path, key) end metric.execute(*metric_type_params) changed # we had changes coming in so we can notify the observers rescue MetricStore::NamespacesExpectedError => e logger.error("Collector: Cannot record metric", :exception => e) rescue NameError => e logger.error("Collector: Cannot create concrete class for this metric type", :type => type, :namespaces_path => namespaces_path, :key => key, :metrics_params => metric_type_params, :exception => e, :stacktrace => e.backtrace) end end |
#snapshot_metric ⇒ LogStash::Instrument::MetricStore
Snapshot the current Metric Store and return it immediately, This is useful if you want to get access to the current metric store without waiting for a periodic call.
81 82 83 |
# File 'lib/logstash/instrument/collector.rb', line 81 def snapshot_metric Snapshot.new(@metric_store) end |
#start_periodic_snapshotting ⇒ Object
Configure and start the periodic task for snapshotting the ‘MetricStore`
86 87 88 89 90 91 92 |
# File 'lib/logstash/instrument/collector.rb', line 86 def start_periodic_snapshotting @snapshot_task = Concurrent::TimerTask.new { publish_snapshot } @snapshot_task.execution_interval = SNAPSHOT_ROTATION_TIME_SECS @snapshot_task.timeout_interval = SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS @snapshot_task.add_observer(self) @snapshot_task.execute end |
#stop ⇒ Object
94 95 96 |
# File 'lib/logstash/instrument/collector.rb', line 94 def stop @snapshot_task.shutdown end |
#update(time_of_execution, result, exception) ⇒ Object
Monitor the ‘Concurrent::TimerTask` this update is triggered on every successful or not run of the task, TimerTask implement Observable and the collector acts as the observer and will keep track if something went wrong in the execution.
68 69 70 71 72 73 74 |
# File 'lib/logstash/instrument/collector.rb', line 68 def update(time_of_execution, result, exception) return true if exception.nil? logger.error("Collector: Something went wrong went sending data to the observers", :execution_time => time_of_execution, :result => result, :exception => exception.class.name) end |