Class: DynamoAutoscale::Actioner

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/dynamo-autoscale/actioner.rb

Direct Known Subclasses

DynamoActioner, LocalActioner

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, logger, #logger, logger=

Constructor Details

#initialize(table, opts = {}) ⇒ Actioner

Returns a new instance of Actioner.



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/dynamo-autoscale/actioner.rb', line 22

def initialize table, opts = {}
  @table            = table
  @downscales       = 0
  @upscales         = 0
  @provisioned      = { reads: RBTree.new, writes: RBTree.new }
  @pending          = { reads: nil, writes: nil }
  @last_action      = Time.now.utc
  @last_scale_check = Time.now.utc
  @downscale_warn   = false
  @opts             = opts
end

Instance Attribute Details

#downscales(new_val = nil) ⇒ Object

Returns the value of attribute downscales.



4
5
6
# File 'lib/dynamo-autoscale/actioner.rb', line 4

def downscales
  @downscales
end

#tableObject

Returns the value of attribute table.



4
5
6
# File 'lib/dynamo-autoscale/actioner.rb', line 4

def table
  @table
end

#upscalesObject

Returns the value of attribute upscales.



4
5
6
# File 'lib/dynamo-autoscale/actioner.rb', line 4

def upscales
  @upscales
end

Class Method Details

.maximum_throughputObject



14
15
16
# File 'lib/dynamo-autoscale/actioner.rb', line 14

def self.maximum_throughput
  @maximum_throughput ||= 20000
end

.maximum_throughput=(new_maximum_throughput) ⇒ Object



18
19
20
# File 'lib/dynamo-autoscale/actioner.rb', line 18

def self.maximum_throughput= new_maximum_throughput
  @maximum_throughput = new_maximum_throughput
end

.minimum_throughputObject



6
7
8
# File 'lib/dynamo-autoscale/actioner.rb', line 6

def self.minimum_throughput
  @minimum_throughput ||= 10
end

.minimum_throughput=(new_minimum_throughput) ⇒ Object



10
11
12
# File 'lib/dynamo-autoscale/actioner.rb', line 10

def self.minimum_throughput= new_minimum_throughput
  @minimum_throughput = new_minimum_throughput
end

Instance Method Details

#can_run?Boolean

This should be overwritten by deriving classes. In the Dynamo actioner, this should check that the table is in an :active state. In the local actioner this will be faked.

Returns:

  • (Boolean)


68
69
70
# File 'lib/dynamo-autoscale/actioner.rb', line 68

def can_run?
  false
end

#check_day_reset!Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/dynamo-autoscale/actioner.rb', line 46

def check_day_reset!
  now = Time.now.utc

  if now >= (check = (@last_scale_check + 1.day).midnight)
    logger.info "[scales] New day! Reset scaling counts back to 0."
    logger.debug "[scales] now: #{now}, comp: #{check}"

    if @downscales < 4
      logger.warn "[scales] Unused downscales. Used: #{@downscales}"
    end

    @upscales   = 0
    @downscales = 0
    @downscale_warn    = false
  end

  @last_scale_check = now
end

#clear_pending!Object



256
257
258
259
# File 'lib/dynamo-autoscale/actioner.rb', line 256

def clear_pending!
  @pending[:writes] = nil
  @pending[:reads] = nil
end

#downscale(metric, from, to) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/dynamo-autoscale/actioner.rb', line 150

def downscale metric, from, to
  if @downscales >= 4
    unless @downscale_warn
      @downscale_warn = true
      logger.warn "[#{metric.to_s.ljust(6)}][scaling failed]" +
        " Hit upper limit of downward scales per day."
    end

    return false
  end

  if @pending[metric]
    logger.info "[#{metric}][scaling down] " +
      "#{@pending[metric]} -> #{to.round(2)} (overwritten pending)"
  else
    logger.info "[#{metric}][scaling down] " +
      "#{from ? from.round(2) : "Unknown"} -> #{to.round(2)}"
  end

  queue_operation! metric, from, to
end

#flush_operations!Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/dynamo-autoscale/actioner.rb', line 193

def flush_operations!
  result = nil
  now = Time.now.utc

  if @pending[:writes] and @pending[:reads]
    wfrom, wto = @pending[:writes]
    rfrom, rto = @pending[:reads]

    if result = scale_both(rto, wto)
      @provisioned[:writes][now] = wto
      @provisioned[:reads][now] = rto

      table.scale_events[now] = {
        writes_from: wfrom,
        writes_to:   wto,
        reads_from:  rfrom,
        reads_to:    rto,
      }

      @pending[:writes] = nil
      @pending[:reads] = nil

      logger.info "[flush] Flushed a read and a write event."
    else
      logger.error "[flush] Failed to flush a read and write event."
    end
  elsif @pending[:writes]
    from, to = @pending[:writes]

    if result = scale(:writes, to)
      @provisioned[:writes][now] = to
      table.scale_events[now]    = { writes_from: from, writes_to: to }
      @pending[:writes]          = nil

      logger.info "[flush] Flushed a write event."
    else
      logger.error "[flush] Failed to flush a write event."
    end
  elsif @pending[:reads]
    from, to = @pending[:reads]

    if result = scale(:reads, to)
      @provisioned[:reads][now] = to
      table.scale_events[now]   = { reads_from: from, reads_to: to }
      @pending[:reads]          = nil

      logger.info "[flush] Flushed a read event."
    else
      logger.error "[flush] Failed to flush a read event."
    end
  end

  return result
end

#pending_reads?Boolean

Returns:

  • (Boolean)


248
249
250
# File 'lib/dynamo-autoscale/actioner.rb', line 248

def pending_reads?
  !!@pending[:reads]
end

#pending_writes?Boolean

Returns:

  • (Boolean)


252
253
254
# File 'lib/dynamo-autoscale/actioner.rb', line 252

def pending_writes?
  !!@pending[:writes]
end

#provisioned_for(metric) ⇒ Object



34
35
36
# File 'lib/dynamo-autoscale/actioner.rb', line 34

def provisioned_for metric
  @provisioned[normalize_metric(metric)]
end

#provisioned_readsObject



42
43
44
# File 'lib/dynamo-autoscale/actioner.rb', line 42

def provisioned_reads
  @provisioned[:reads]
end

#provisioned_writesObject



38
39
40
# File 'lib/dynamo-autoscale/actioner.rb', line 38

def provisioned_writes
  @provisioned[:writes]
end

#queue_operation!(metric, from, to) ⇒ Object



172
173
174
175
# File 'lib/dynamo-autoscale/actioner.rb', line 172

def queue_operation! metric, from, to
  @pending[metric] = [from, to]
  try_flush!
end

#set(metric, to) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/dynamo-autoscale/actioner.rb', line 82

def set metric, to
  check_day_reset!

  metric = normalize_metric(metric)
  ptime, _ = provisioned_for(metric).last

  if ptime and ptime > 2.minutes.ago
    logger.warn "[actioner] Attempted to scale the same metric more than " +
      "once in a 2 minute window. Disallowing."
    return false
  end

  from = table.last_provisioned_for(metric)

  if from and to > (from * 2)
    to = from * 2

    logger.warn "[#{metric}] Attempted to scale up " +
      "more than allowed. Capped scale to #{to}."
  end

  if to < Actioner.minimum_throughput
    to = Actioner.minimum_throughput

    logger.warn "[#{metric}] Attempted to scale down to " +
      "less than minimum throughput. Capped scale to #{to}."
  end

  if to > Actioner.maximum_throughput
    to = Actioner.maximum_throughput

    logger.warn "[#{metric}] Attempted to scale up to " +
      "greater than maximum throughput. Capped scale to #{to}."
  end

  if from and from == to
    logger.info "[#{metric}] Attempted to scale to same value. Ignoring..."
    return false
  end

  if from and from > to
    downscale metric, from, to
  else
    upscale metric, from, to
  end
end

#should_flush?Boolean

Returns:

  • (Boolean)


261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/dynamo-autoscale/actioner.rb', line 261

def should_flush?
  if @opts[:group_downscales].nil?
    logger.info "[flush] Downscales are not being grouped. Should flush."
    return true
  end

  if pending_reads? and pending_writes?
    logger.info "[flush] Both a read and a write are pending. Should flush."
    return true
  end

  now = Time.now.utc

  # I know what you're thinking. How would the last action ever be in the
  # future? Locally, we use Timecop to fake out the time. Unfortunately it
  # doesn't kick in until after the first data point, so when this object is
  # created the @last_action is set to Time.now.utc, then the time gets
  # rolled back, causing the last action to be in the future. This hack
  # fixes that.
  @last_action = now if @last_action > now

  if (@opts[:flush_after] and @last_action and
    (now > @last_action + @opts[:flush_after]))

    logger.info "[flush] Flush timeout of #{@opts[:flush_after]} reached."
    return true
  end

  logger.info "[flush] Flushing conditions not met. Pending operations: " +
    "#{@pending[:reads] ? "1 read" : "no reads"}, " +
    "#{@pending[:writes] ? "1 write" : "no writes"}"

  return false
end

#try_flush!Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/dynamo-autoscale/actioner.rb', line 177

def try_flush!
  if should_flush?
    if flush_operations!
      @downscales += 1
      @last_action = Time.now.utc
      ScaleReport.new(table).send

      return true
    else
      return false
    end
  else
    return false
  end
end

#upscale(metric, from, to) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/dynamo-autoscale/actioner.rb', line 129

def upscale metric, from, to
  logger.info "[#{metric}][scaling up] " +
    "#{from ? from.round(2) : "Unknown"} -> #{to.round(2)}"

  now = Time.now.utc

  # Because upscales are not limited, we don't need to queue this operation.
  if result = scale(metric, to)
    table.scale_events[now] = {
      "#{metric}_to".to_sym => to,
      "#{metric}_from".to_sym => from,
    }

    @provisioned[metric][now] = to
    @upscales += 1
    ScaleReport.new(table).send
  end

  return result
end