Class: Librato::Collector::Aggregator
- Inherits:
-
Object
- Object
- Librato::Collector::Aggregator
- Extended by:
- Forwardable
- Defined in:
- lib/librato/collector/aggregator.rb
Overview
maintains storage of timing and measurement type measurements
Constant Summary collapse
- SEPARATOR =
"$$"
Instance Attribute Summary collapse
-
#default_tags ⇒ Object
readonly
Returns the value of attribute default_tags.
Instance Method Summary collapse
- #[](key) ⇒ Object
-
#delete_all ⇒ Object
clear all stored values.
-
#fetch(key, options = {}) ⇒ Object
retrieve current value of a metric/source/percentage.
-
#flush_to(queue, opts = {}) ⇒ Object
transfer all measurements to queue and reset internal status.
-
#initialize(options = {}) ⇒ Aggregator
constructor
A new instance of Aggregator.
- #measure(*args, &block) ⇒ Object (also: #timing)
Constructor Details
#initialize(options = {}) ⇒ Aggregator
Returns a new instance of Aggregator.
16 17 18 19 20 21 |
# File 'lib/librato/collector/aggregator.rb', line 16 def initialize(={}) @cache = Librato::Metrics::Aggregator.new(prefix: [:prefix]) @percentiles = {} @lock = Mutex.new @default_tags = .fetch(:default_tags, {}) end |
Instance Attribute Details
#default_tags ⇒ Object (readonly)
Returns the value of attribute default_tags.
14 15 16 |
# File 'lib/librato/collector/aggregator.rb', line 14 def @default_tags end |
Instance Method Details
#[](key) ⇒ Object
23 24 25 |
# File 'lib/librato/collector/aggregator.rb', line 23 def [](key) fetch(key) end |
#delete_all ⇒ Object
clear all stored values
45 46 47 |
# File 'lib/librato/collector/aggregator.rb', line 45 def delete_all @lock.synchronize { clear_storage } end |
#fetch(key, options = {}) ⇒ Object
retrieve current value of a metric/source/percentage. this exists primarily for debugging/testing and isn’t called routinely.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/librato/collector/aggregator.rb', line 29 def fetch(key, ={}) return nil if @cache.empty? return fetch_percentile(key, ) if [:percentile] measurements = nil = [:tags] || @default_tags @lock.synchronize { measurements = @cache.queued[:measurements] } measurements.each do |metric| if metric[:name] == key.to_s return metric if ! && !metric[:tags] return metric if == metric[:tags] end end nil end |
#flush_to(queue, opts = {}) ⇒ Object
transfer all measurements to queue and reset internal status
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/librato/collector/aggregator.rb', line 50 def flush_to(queue, opts={}) queued = nil @lock.synchronize do return if @cache.empty? queued = @cache.queued flush_percentiles(queue, opts) unless @percentiles.empty? clear_storage unless opts[:preserve] end queue.merge!(queued) if queued end |
#measure(*args, &block) ⇒ Object Also known as: timing
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/librato/collector/aggregator.rb', line 75 def measure(*args, &block) = {} event = args[0].to_s returned = nil # handle block or specified argument if block_given? start = Time.now returned = yield value = ((Time.now - start) * 1000.0).to_i elsif args[1] value = args[1] else raise "no value provided" end # detect options hash if present if args.length > 1 and args[-1].respond_to?(:each) = args[-1] end percentiles = Array([:percentile]) source = [:source] = [:tags] = { source: source } if source && ! = if && [:inherit_tags] @default_tags.merge() elsif else @default_tags end @lock.synchronize do payload = { value: value } payload.merge!({ tags: }) if @cache.add event => payload percentiles.each do |perc| store = fetch_percentile_store(event, payload) store[:reservoir] << value track_percentile(store, perc) end end returned end |