Class: LogStash::Instrument::Collector

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable, Observable
Defined in:
lib/logstash/instrument/collector.rb

Overview

The Collector is the single point of reference for all the metrics collection inside logstash, the metrics library will make direct calls to this class.

This class is an observable responsable of periodically emitting view of the system to other components like the internal metrics pipelines.

Constant Summary collapse

SNAPSHOT_ROTATION_TIME_SECS =

seconds

1
SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS =

seconds

10 * 60

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #logger, #slow_logger

Constructor Details

#initializeCollector

Returns a new instance of Collector.



26
27
28
29
30
# File 'lib/logstash/instrument/collector.rb', line 26

def initialize
  @metric_store = MetricStore.new
  @agent = nil
  start_periodic_snapshotting
end

Instance Attribute Details

#agentObject

Returns the value of attribute agent.



24
25
26
# File 'lib/logstash/instrument/collector.rb', line 24

def agent
  @agent
end

Instance Method Details

#clear(keypath) ⇒ Object



108
109
110
# File 'lib/logstash/instrument/collector.rb', line 108

def clear(keypath)
  @metric_store.prune(keypath)
end

#publish_snapshotObject

Create a snapshot of the MetricStore and send it to to the registered observers The observer will receive the following signature in the update methode.

‘#update(created_at, metric_store)`



102
103
104
105
106
# File 'lib/logstash/instrument/collector.rb', line 102

def publish_snapshot
  created_at = Time.now
  logger.debug("Collector: Sending snapshot to observers", :created_at => created_at) if logger.debug?
  notify_observers(snapshot_metric)
end

#push(namespaces_path, key, type, *metric_type_params) ⇒ Object

The metric library will call this unique interface its the job of the collector to update the store with new metric of update the metric

If there is a problem with the key or the type of metric we will record an error but we wont stop processing events, theses errors are not considered fatal.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/logstash/instrument/collector.rb', line 39

def push(namespaces_path, key, type, *metric_type_params)
  begin
    metric = @metric_store.fetch_or_store(namespaces_path, key) do
      LogStash::Instrument::MetricType.create(type, namespaces_path, key)
    end

    metric.execute(*metric_type_params)

    changed # we had changes coming in so we can notify the observers
  rescue MetricStore::NamespacesExpectedError => e
    logger.error("Collector: Cannot record metric", :exception => e)
  rescue NameError => e
    logger.error("Collector: Cannot create concrete class for this metric type",
                 :type => type,
                 :namespaces_path => namespaces_path,
                 :key => key,
                 :metrics_params => metric_type_params,
                 :exception => e,
                 :stacktrace => e.backtrace)
  end
end

#snapshot_metricLogStash::Instrument::MetricStore

Snapshot the current Metric Store and return it immediately, This is useful if you want to get access to the current metric store without waiting for a periodic call.



81
82
83
# File 'lib/logstash/instrument/collector.rb', line 81

def snapshot_metric
  Snapshot.new(@metric_store)
end

#start_periodic_snapshottingObject

Configure and start the periodic task for snapshotting the ‘MetricStore`



86
87
88
89
90
91
92
# File 'lib/logstash/instrument/collector.rb', line 86

def start_periodic_snapshotting
  @snapshot_task = Concurrent::TimerTask.new { publish_snapshot }
  @snapshot_task.execution_interval = SNAPSHOT_ROTATION_TIME_SECS
  @snapshot_task.timeout_interval = SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS
  @snapshot_task.add_observer(self)
  @snapshot_task.execute
end

#stopObject



94
95
96
# File 'lib/logstash/instrument/collector.rb', line 94

def stop
  @snapshot_task.shutdown
end

#update(time_of_execution, result, exception) ⇒ Object

Monitor the ‘Concurrent::TimerTask` this update is triggered on every successful or not run of the task, TimerTask implement Observable and the collector acts as the observer and will keep track if something went wrong in the execution.

Parameters:

  • Time (Time)

    of execution

  • Result (result)

    of the execution

  • Exception (Exception)


68
69
70
71
72
73
74
# File 'lib/logstash/instrument/collector.rb', line 68

def update(time_of_execution, result, exception)
  return true if exception.nil?
  logger.error("Collector: Something went wrong went sending data to the observers",
               :execution_time => time_of_execution,
               :result => result,
               :exception => exception.class.name)
end