Class: UState::Aggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/ustate/aggregator.rb

Constant Summary collapse

INTERVAL =

Combines states periodically.

1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(index, opts = {}) ⇒ Aggregator

Returns a new instance of Aggregator.



9
10
11
12
13
14
15
16
17
# File 'lib/ustate/aggregator.rb', line 9

def initialize(index, opts = {})
  @index = index

  @folds = {}
  @interval = opts[:interval] || INTERVAL
  @server = opts[:server]

  start
end

Instance Attribute Details

#foldsObject

Returns the value of attribute folds.



8
9
10
# File 'lib/ustate/aggregator.rb', line 8

def folds
  @folds
end

#intervalObject

Returns the value of attribute interval.



7
8
9
# File 'lib/ustate/aggregator.rb', line 7

def interval
  @interval
end

Instance Method Details

#average(query, *a) ⇒ Object

Combines states matching query with State.average



20
21
22
23
24
# File 'lib/ustate/aggregator.rb', line 20

def average(query, *a)
  fold query do |states|
    State.average states, *a
  end
end

#fold(query, &block) ⇒ Object

Combines states matching query with the given block. The block receives an array of states which presently match.

Example:

fold 'service = "api % reqs/sec"' do |states|
  states.inject(State.new(service: 'api reqs/sec')) do |combined, state|
    combined.metric_f += state.metric_f
    combined
  end
end


36
37
38
39
40
41
42
# File 'lib/ustate/aggregator.rb', line 36

def fold(query, &block)
  @folds[block] = if existing = @folds[block]
    "(#{existing}) or (#{q})"
  else
    query
  end
end

#startObject

Polls index for states matching each fold, applies fold, and inserts into index.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/ustate/aggregator.rb', line 46

def start
  @runner = Thread.new do
    loop do
      begin
        interval = (@interval.to_f / @folds.size) rescue @interval
        @folds.each do |f, query|
          matching = @index.query(Query.new(string: query))
          unless matching.empty?
            if combined = f[matching]
              @index << combined
            end
          end
          sleep interval
        end
      rescue Exception => e
        @server.log.error e
        sleep 1
      end
    end
  end
end

#sum(query, *a) ⇒ Object

Combines states matching query with State.sum



69
70
71
72
73
# File 'lib/ustate/aggregator.rb', line 69

def sum(query, *a)
  fold query do |states|
    State.sum states, *a
  end
end