Class: Librato::Metrics::Queue

Inherits:
Object
  • Object
show all
Includes:
Processor
Defined in:
lib/librato/metrics/queue.rb

Constant Summary

Constants included from Processor

Processor::MEASUREMENTS_PER_REQUEST

Instance Attribute Summary collapse

Attributes included from Processor

#last_submit_time, #per_request, #prefix, #tags

Instance Method Summary collapse

Methods included from Processor

#client, #has_tags?, #persister, #submit, #time

Constructor Details

#initialize(opts = {}) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :autosubmit_count (Integer)

    If set the queue will auto-submit any time it hits this number of measurements.

  • :autosubmit_interval (Integer)

    If set the queue will auto-submit if the given number of seconds has passed when a new metric is added.

  • :clear_failures (Boolean)

    Should the queue remove any queued measurements from its queue if it runs into problems with a request? (default: false)

  • :client (Client)

    The client object to use to connect to Metrics. (default: Librato::Metrics.client)

  • :measure_time (Time|Integer)

    A default measure_time to use for measurements added.

  • :prefix (String)

    If set will apply the given prefix to all metric names of measurements added.

  • :skip_measurement_times (Boolean)

    If true will not assign measurement_time to each measure as they are added.

  • :source (String)

    The default source to use for measurements added.



18
19
20
21
22
23
# File 'lib/librato/metrics/queue.rb', line 18

def initialize(opts={})
  @queued = {}
  @autosubmit_count = opts[:autosubmit_count]
  @skip_measurement_times = opts[:skip_measurement_times]
  setup_common_options(opts)
end

Instance Attribute Details

#skip_measurement_timesObject

Returns the value of attribute skip_measurement_times.



8
9
10
# File 'lib/librato/metrics/queue.rb', line 8

def skip_measurement_times
  @skip_measurement_times
end

Instance Method Details

#add(measurements) ⇒ Queue

Add a metric entry to the metric set:

Parameters:

  • measurements (Hash)

    measurements to add

Returns:

  • (Queue)

    returns self



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/librato/metrics/queue.rb', line 29

def add(measurements)
  measurements.each do |key, value|
    multidimensional = has_tags?
    if value.respond_to?(:each)
      validate_parameters(value)
      metric = value
      metric[:name] = key.to_s
      type = metric.delete(:type) || metric.delete('type') || 'gauge'
    else
      metric = {name: key.to_s, value: value}
      type = :gauge
    end
    if @prefix
      metric[:name] = "#{@prefix}.#{metric[:name]}"
    end
    multidimensional = true if metric[:tags] || metric[:time]
    type = ("#{type}s").to_sym
    time_key = multidimensional ? :time : :measure_time
    metric[:time] = metric.delete(:measure_time) if multidimensional && metric[:measure_time]

    if metric[time_key]
      metric[time_key] = metric[time_key].to_i
      check_measure_time(metric)
    elsif !skip_measurement_times
      metric[time_key] = epoch_time
    end
    if multidimensional
      @queued[:measurements] ||= []
      @queued[:measurements] << metric
    else
      @queued[type] ||= []
      @queued[type] << metric
    end
  end
  submit_check
  self
end

#clearObject Also known as: flush

Remove all queued metrics



83
84
85
# File 'lib/librato/metrics/queue.rb', line 83

def clear
  @queued = {}
end

#countersArray

Currently queued counters

Returns:

  • (Array)


70
71
72
# File 'lib/librato/metrics/queue.rb', line 70

def counters
  @queued[:counters] || []
end

#empty?Boolean

Are any metrics currently queued?

Returns:

  • (Boolean)

    Boolean



77
78
79
# File 'lib/librato/metrics/queue.rb', line 77

def empty?
  gauges.empty? && counters.empty? && measurements.empty?
end

#gaugesObject

Currently queued gauges

Returns:

  • Array



91
92
93
# File 'lib/librato/metrics/queue.rb', line 91

def gauges
  @queued[:gauges] || []
end

#measurementsObject



95
96
97
# File 'lib/librato/metrics/queue.rb', line 95

def measurements
  @queued[:measurements] || []
end

#merge!(mergeable) ⇒ Object

Combines queueable measures from the given object into this queue.

Examples:

Merging queues for more performant submission

queue1.merge!(queue2)
queue1.submit  # submits combined contents

Returns:

  • self



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/librato/metrics/queue.rb', line 107

def merge!(mergeable)
  if mergeable.respond_to?(:queued)
    to_merge = mergeable.queued
  elsif mergeable.respond_to?(:has_key?)
    to_merge = mergeable
  else
    raise NotMergeable
  end
  Metrics::PLURAL_TYPES.each do |type|
    if to_merge[type]
      payload = reconcile(to_merge[type], to_merge[:source])
      if @queued[type]
        @queued[type] += payload
      else
        @queued[type] = payload
      end
    end
  end

  if to_merge[:measurements]
    payload = reconcile(to_merge[:measurements], to_merge[:tags])
    if @queued[:measurements]
      @queued[:measurements] += payload
    else
      @queued[:measurements] = payload
    end
  end

  submit_check
  self
end

#queuedObject

All currently queued metrics

Returns:

  • Hash



142
143
144
145
146
147
148
149
150
# File 'lib/librato/metrics/queue.rb', line 142

def queued
  return {} if @queued.empty?
  globals = {}
  time = has_tags? ? :time : :measure_time
  globals[time] = @time if @time
  globals[:source] = @source if @source
  globals[:tags] = @tags if has_tags?
  @queued.merge(globals)
end

#sizeObject Also known as: length

Count of metrics currently queued

Returns:

  • Integer



155
156
157
# File 'lib/librato/metrics/queue.rb', line 155

def size
  self.queued.inject(0) { |result, data| result + data.last.size }
end