Class: Vector::Function::PredictiveScaling

Inherits:
Object
  • Object
show all
Includes:
HLogger
Defined in:
lib/vector/functions/predictive_scaling.rb

Instance Method Summary collapse

Methods included from HLogger

enable, #hlog, #hlog_ctx

Constructor Details

#initialize(options) ⇒ PredictiveScaling

Returns a new instance of PredictiveScaling.



6
7
8
9
10
11
12
# File 'lib/vector/functions/predictive_scaling.rb', line 6

def initialize(options)
  @cloudwatch = options[:cloudwatch]
  @lookback_windows = options[:lookback_windows]
  @lookahead_window = options[:lookahead_window]
  @valid_threshold = options[:valid_threshold]
  @valid_period = options[:valid_period]
end

Instance Method Details

#run_for(group) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/vector/functions/predictive_scaling.rb', line 14

def run_for(group)
  result = { :check_procs => [], :triggered => false }

  hlog_ctx "group: #{group.name}" do
    return result if @lookback_windows.length == 0

    scaleup_policies = group.scaling_policies.select do |policy|
      policy.scaling_adjustment > 0
    end

    scaleup_policies.each do |policy|
      hlog_ctx "policy: #{policy.name}" do

        policy.alarms.keys.each do |alarm_name|
          alarm = @cloudwatch.alarms[alarm_name]
          hlog_ctx "alarm: #{alarm.name} (metric #{alarm.metric.name})" do

            unless alarm.enabled?
              hlog "Skipping disabled alarm"
              next
            end

            # Note that everywhere we say "load" what we mean is
            # "metric value * number of nodes"
            now_load, now_num = load_for(group, alarm.metric,
              Time.now, @valid_period)

            if now_load.nil?
              hlog "Could not get current total for metric"
              next
            end

            @lookback_windows.each do |window|
              hlog_ctx "window: #{window.inspect}" do
                then_load, = load_for(group, alarm.metric,
                  Time.now - window, @valid_period)

                if then_load.nil?
                  hlog "Could not get past total value for metric"
                  next
                end

                # check that the past total utilization is within
                # threshold% of the current total utilization
                if @valid_threshold &&
                  !Vector.within_threshold(@valid_threshold, now_load, then_load)
                  hlog "Past metric total value not within threshold (current #{now_load}, then #{then_load})"
                  next
                end

                past_load, = load_for(group, alarm.metric,
                  Time.now - window + @lookahead_window,
                  alarm.period)

                if past_load.nil?
                  hlog "Could not get past + #{@lookahead_window.inspect} total value for metric"
                  next
                end

                # now take the past total load and divide it by the
                # current number of instances to get the predicted value
                check_proc = Proc.new do |num_nodes|
                  predicted_value = past_load.to_f / num_nodes
                  hlog "Predicted #{alarm.metric.name}: #{predicted_value} (#{num_nodes} nodes)"

                  check_alarm_threshold(alarm, predicted_value)
                end
                result[:check_procs] << check_proc

                if check_proc.call(now_num)
                  hlog "Executing policy"
                  policy.execute(honor_cooldown: true)

                  result[:triggered] = true

                  # don't need to evaluate further windows or policies on this group
                  return result
                end
              end
            end
          end
        end
      end
    end
  end

  result
end