Class: Vector::Function::PredictiveScaling

Inherits:
Object
  • Object
show all
Includes:
HLogger
Defined in:
lib/vector/functions/predictive_scaling.rb

Instance Method Summary collapse

Methods included from HLogger

enable, #hlog, #hlog_ctx, #hlog_ctx_string

Constructor Details

#initialize(options) ⇒ PredictiveScaling

Returns a new instance of PredictiveScaling.



6
7
8
9
10
11
12
13
# File 'lib/vector/functions/predictive_scaling.rb', line 6

def initialize(options)
  @cloudwatch = options[:cloudwatch]
  @dry_run = options[:dry_run]
  @lookback_windows = options[:lookback_windows]
  @lookahead_window = options[:lookahead_window]
  @valid_threshold = options[:valid_threshold]
  @valid_period = options[:valid_period]
end

Instance Method Details

#run_for(group) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/vector/functions/predictive_scaling.rb', line 15

def run_for(group)
  result = { :check_procs => [], :triggered => false }

  hlog_ctx "ps" do
    hlog_ctx "group:#{group.name}" do
      return result if @lookback_windows.length == 0

      scaleup_policies = group.scaling_policies.select do |policy|
        policy.scaling_adjustment > 0
      end

      scaleup_policies.each do |policy|
        hlog_ctx "policy:#{policy.name}" do

          policy.alarms.keys.each do |alarm_name|
            alarm = @cloudwatch.alarms[alarm_name]
            hlog_ctx "alarm:#{alarm.name}" do
              hlog "Metric #{alarm.metric.name}"

              unless alarm.enabled?
                hlog "Skipping disabled alarm"
                next
              end

              # Note that everywhere we say "load" what we mean is
              # "metric value * number of nodes"
              now_load, now_num = load_for(group, alarm.metric,
                Time.now, @valid_period)

              if now_load.nil?
                hlog "Could not get current total for metric"
                next
              end

              @lookback_windows.each do |window|
                hlog_ctx "window:#{window.inspect.gsub ' ', ''}" do
                  then_load, = load_for(group, alarm.metric,
                    Time.now - window, @valid_period)

                  if then_load.nil?
                    hlog "Could not get past total value for metric"
                    next
                  end

                  # check that the past total utilization is within
                  # threshold% of the current total utilization
                  if @valid_threshold &&
                    !Vector.within_threshold(@valid_threshold, now_load, then_load)
                    hlog "Past metric total value not within threshold (current #{now_load}, then #{then_load})"
                    next
                  end

                  past_load, = load_for(group, alarm.metric,
                    Time.now - window + @lookahead_window,
                    alarm.period)

                  if past_load.nil?
                    hlog "Could not get past + #{@lookahead_window.inspect} total value for metric"
                    next
                  end

                  # now take the past total load and divide it by the
                  # current number of instances to get the predicted value

                  # (we capture our original log context here in order to display
                  # the source of these checks later when this proc is called by
                  # scaledown stuff).
                  orig_ctx = hlog_ctx_string
                  check_proc = Proc.new do |num_nodes, logger|
                    predicted_value = past_load.to_f / num_nodes

                    log_str = "Predicted #{alarm.metric.name}: #{predicted_value} (#{num_nodes} nodes)"

                    # Tack on the original context if we're in a different logger
                    # (for the case where this is called during scaledown checks).
                    if orig_ctx != logger.hlog_ctx_string
                      log_str += " (from #{orig_ctx})"
                    end

                    logger.hlog log_str

                    check_alarm_threshold(alarm, predicted_value)
                  end
                  result[:check_procs] << check_proc

                  if check_proc.call(now_num, self)
                    if @dry_run
                      hlog "Executing policy (DRY RUN)"
                    else
                      hlog "Executing policy"
                      policy.execute(honor_cooldown: true)
                    end

                    result[:triggered] = true

                    # don't need to evaluate further windows or policies on this group
                    return result
                  end
                end
              end
            end
          end
        end
      end
    end
  end

  result
end