Class: LogStash::Instrument::MetricStore

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/instrument/metric_store.rb

Overview

The Metric store the data structure that make sure the data is saved in a retrievable way, this is a wrapper around multiples ConcurrentHashMap acting as a tree like structure.

Defined Under Namespace

Classes: MetricNotFound, NamespacesExpectedError

Constant Summary collapse

KEY_PATH_SEPARATOR =
"/".freeze
FILTER_KEYS_SEPARATOR =

Lets me a bit flexible on the coma usage in the path definition

/\s?*,\s*/.freeze

Instance Method Summary collapse

Constructor Details

#initializeMetricStore

Returns a new instance of MetricStore.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/logstash/instrument/metric_store.rb', line 20

def initialize
  # We keep the structured cache to allow
  # the api to search the content of the differents nodes
  @store = Concurrent::Map.new

  # This hash has only one dimension
  # and allow fast retrieval of the metrics
  @fast_lookup = Concurrent::Map.new

  # This Mutex block the critical section for the
  # structured hash, it block the zone when we first insert a metric
  # in the structured hash or when we query it for search or to make
  # the result available in the API.
  @structured_lookup_mutex = Mutex.new
end

Instance Method Details

#each(path = nil, &block) ⇒ Object Also known as: all

Return all the individuals Metric, This call mimic a Enum’s each if a block is provided

Parameters:

  • path (String) (defaults to: nil)

    The search path for metrics

  • The (Array)

    metric for the specific path



174
175
176
177
178
179
180
181
182
# File 'lib/logstash/instrument/metric_store.rb', line 174

def each(path = nil, &block)
  metrics = if path.nil?
    get_all
  else
    transform_to_array(get_with_path(path))
  end

  block_given? ? metrics.each(&block) : metrics
end

#extract_metrics(path, *keys) ⇒ Object

Return a hash including the values of the keys given at the path given

Example Usage: extract_metrics(

[:jvm, :process],
:open_file_descriptors,
[:cpu, [:total_in_millis, :percent]]
[:pipelines, [:one, :two], :size]

)

Returns: # From the jvm.process metrics namespace {

:open_file_descriptors => 123
:cpu => { :total_in_millis => 456, :percent => 789 }
:pipelines => {
                :one => {:size => 90210},
                :two => {:size => 8675309}
              }

}



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/logstash/instrument/metric_store.rb', line 132

def extract_metrics(path, *keys)
  keys.reduce({}) do |acc,k|
    # Simplify 1-length keys
    k = k.first if k.is_a?(Array) && k.size == 1

    # If we have array values here we need to recurse
    # There are two levels of looping here, one for the paths we might pass in
    # one for the upcoming keys we might pass in
    if k.is_a?(Array)
      # We need to build up future executions to extract_metrics
      # which means building up the path and keys arguments.
      # We need a nested loop her to execute all permutations of these in case we hit
      # something like [[:a,:b],[:c,:d]] which produces 4 different metrics
      next_paths = Array(k.first)
      next_keys = Array(k[1])
      rest = k[2..-1]
      next_paths.each do |next_path|
        # If there already is a hash at this location use that so we don't overwrite it
        np_hash = acc[next_path] || {}
        
        acc[next_path] = next_keys.reduce(np_hash) do |a,next_key|
          a.merge! extract_metrics(path + [next_path], [next_key, *rest])
        end
      end
    else # Scalar value
      res = get_shallow(*path)[k]
      acc[k] = res ? res.value : nil
    end
    
    acc
  end
end

#fetch_or_store(namespaces, key, default_value = nil) ⇒ Object

This method use the namespace and key to search the corresponding value of the hash, if it doesn’t exist it will create the appropriate namespaces path in the hash and return ‘new_value`

Parameters:

  • The (Array)

    path where the values should be located

  • The (Symbol)

    metric key

Returns:

  • (Object)

    Return the new_value of the retrieve object in the tree



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/logstash/instrument/metric_store.rb', line 43

def fetch_or_store(namespaces, key, default_value = nil)

  # We first check in the `@fast_lookup` store to see if we have already see that metrics before,
  # This give us a `o(1)` access, which is faster than searching through the structured
  # data store (Which is a `o(n)` operation where `n` is the number of element in the namespace and
  # the value of the key). If the metric is already present in the `@fast_lookup`, then that value is sent
  # back directly to the caller.
  #
  # BUT. If the value is not present in the `@fast_lookup` the value will be inserted and we assume that we don't
  # have it in the `@metric_store` for structured search so we add it there too.

  value = @fast_lookup.get(namespaces.dup << key)
  if value.nil?
    value = block_given? ? yield(key) : default_value
    @fast_lookup.put(namespaces.dup << key, value)
    @structured_lookup_mutex.synchronize do
        # If we cannot find the value this mean we need to save it in the store.
      fetch_or_store_namespaces(namespaces).fetch_or_store(key, value)
    end
  end
  return value;
end

#get(*key_paths) ⇒ Hash

Similar to ‘get_with_path` but use symbols instead of string

Parameters:

  • (Array<Symbol>)

Returns:

  • (Hash)


89
90
91
92
93
94
95
96
97
98
99
# File 'lib/logstash/instrument/metric_store.rb', line 89

def get(*key_paths)
  # Normalize the symbols access
  key_paths.map(&:to_sym)
  new_hash = Hash.new

  @structured_lookup_mutex.synchronize do
    get_recursively(key_paths, @store, new_hash)
  end

  new_hash
end

#get_shallow(*key_paths) ⇒ Hash

Retrieve values like ‘get`, but don’t return them fully nested. This means that if you call ‘get_shallow(:foo, :bar)` the result will not be nested inside of `{:bar => values`.

Parameters:

  • (Array<Symbol>)

Returns:

  • (Hash)


107
108
109
# File 'lib/logstash/instrument/metric_store.rb', line 107

def get_shallow(*key_paths)
  key_paths.reduce(get(*key_paths)) {|acc, p| acc[p]}
end

#get_with_path(path) ⇒ Hash

This method allow to retrieve values for a specific path, This method support the following queries

stats/pipelines/pipeline_X stats/pipelines/pipeline_X,pipeline_2 stats/os,jvm

If you use the ‘,` on a key the metric store will return the both values at that level

The returned hash will keep the same structure as it had in the ‘Concurrent::Map` but will be a normal ruby hash. This will allow the api to easily serialize the content of the map

Parameters:

  • The (Array)

    path where values should be located

Returns:

  • (Hash)


81
82
83
# File 'lib/logstash/instrument/metric_store.rb', line 81

def get_with_path(path)
  get(*key_paths(path))
end

#has_metric?(*path) ⇒ Boolean

Returns:

  • (Boolean)


165
166
167
# File 'lib/logstash/instrument/metric_store.rb', line 165

def has_metric?(*path)
  @fast_lookup[path]
end

#prune(path) ⇒ Object



185
186
187
188
189
190
191
192
# File 'lib/logstash/instrument/metric_store.rb', line 185

def prune(path)
  key_paths = key_paths(path).map(&:to_sym)
  @structured_lookup_mutex.synchronize do
    keys_to_delete = @fast_lookup.keys.select {|namespace| (key_paths - namespace[0..-2]).empty? }
    keys_to_delete.each {|k| @fast_lookup.delete(k) }
    delete_from_map(@store, key_paths)
  end
end

#sizeObject



194
195
196
# File 'lib/logstash/instrument/metric_store.rb', line 194

def size
  @fast_lookup.size
end