Class: Librato::Collector::Aggregator

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/librato/collector/aggregator.rb

Overview

maintains storage of timing and measurement type measurements

Constant Summary collapse

SEPARATOR =
"$$"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Aggregator

Returns a new instance of Aggregator.



16
17
18
19
20
21
# File 'lib/librato/collector/aggregator.rb', line 16

def initialize(options={})
  @cache = Librato::Metrics::Aggregator.new(prefix: options[:prefix])
  @percentiles = {}
  @lock = Mutex.new
  @default_tags = options.fetch(:default_tags, {})
end

Instance Attribute Details

#default_tagsObject (readonly)

Returns the value of attribute default_tags.



14
15
16
# File 'lib/librato/collector/aggregator.rb', line 14

def default_tags
  @default_tags
end

Instance Method Details

#[](key) ⇒ Object



23
24
25
# File 'lib/librato/collector/aggregator.rb', line 23

def [](key)
  fetch(key)
end

#delete_allObject

clear all stored values



45
46
47
# File 'lib/librato/collector/aggregator.rb', line 45

def delete_all
  @lock.synchronize { clear_storage }
end

#fetch(key, options = {}) ⇒ Object

retrieve current value of a metric/source/percentage. this exists primarily for debugging/testing and isn’t called routinely.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/librato/collector/aggregator.rb', line 29

def fetch(key, options={})
  return nil if @cache.empty?
  return fetch_percentile(key, options) if options[:percentile]
  measurements = nil
  tags = options[:tags] || @default_tags
  @lock.synchronize { measurements = @cache.queued[:measurements] }
  measurements.each do |metric|
    if metric[:name] == key.to_s
      return metric if !tags && !metric[:tags]
      return metric if tags == metric[:tags]
    end
  end
  nil
end

#flush_to(queue, opts = {}) ⇒ Object

transfer all measurements to queue and reset internal status



50
51
52
53
54
55
56
57
58
59
# File 'lib/librato/collector/aggregator.rb', line 50

def flush_to(queue, opts={})
  queued = nil
  @lock.synchronize do
    return if @cache.empty?
    queued = @cache.queued
    flush_percentiles(queue, opts) unless @percentiles.empty?
    clear_storage unless opts[:preserve]
  end
  queue.merge!(queued) if queued
end

#measure(*args, &block) ⇒ Object Also known as: timing

Examples:

Simple measurement

measure 'sources_returned', sources.length

Simple timing in milliseconds

timing 'twitter.lookup', 2.31

Block-based timing

timing 'db.query' do
  do_my_query
end

Custom source

measure 'user.all_orders', user.order_count, :source => user.id


75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/librato/collector/aggregator.rb', line 75

def measure(*args, &block)
  options = {}
  event = args[0].to_s
  returned = nil

  # handle block or specified argument
  if block_given?
    start = Time.now
    returned = yield
    value = ((Time.now - start) * 1000.0).to_i
  elsif args[1]
    value = args[1]
  else
    raise "no value provided"
  end

  # detect options hash if present
  if args.length > 1 and args[-1].respond_to?(:each)
    options = args[-1]
  end

  percentiles = Array(options[:percentile])
  source = options[:source]
  tags_option = options[:tags]
  tags_option = { source: source } if source && !tags_option
  tags =
    if tags_option && options[:inherit_tags]
      @default_tags.merge(tags_option)
    elsif tags_option
      tags_option
    else
      @default_tags
    end

  @lock.synchronize do
    payload = { value: value }
    payload.merge!({ tags: tags }) if tags
    @cache.add event => payload

    percentiles.each do |perc|
      store = fetch_percentile_store(event, payload)
      store[:reservoir] << value
      track_percentile(store, perc)
    end
  end
  returned
end