Class: Sidekiq::HerokuAutoscale::Process

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/heroku_autoscale/process.rb

Constant Summary collapse

WAKE_THROTTLE =
PollInterval.new(:wait_for_update!, before_update: 2)
SHUTDOWN_POLL =
PollInterval.new(:wait_for_shutdown!, before_update: 10)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name: 'worker', app_name: nil, client: nil, throttle: 10, history: 3600, quiet_buffer: 10, system: {}, scale: {}) ⇒ Process

Returns a new instance of Process.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 14

def initialize(
  name: 'worker',
  app_name: nil,
  client: nil,
  throttle: 10, # 10 seconds
  history: 3600, # 1 hour
  quiet_buffer: 10,
  system: {},
  scale: {}
)
  @app_name = app_name || name.to_s
  @name = name.to_s
  @client = client
  @queue_system = QueueSystem.new(system)
  @scale_strategy = ScaleStrategy.new(scale)

  @dynos = 0
  @active_at = nil
  @updated_at = nil
  @quieted_at = nil
  @quieted_to = nil

  @throttle = throttle
  @history = history
  @quiet_buffer = quiet_buffer
end

Instance Attribute Details

#active_atObject

Returns the value of attribute active_at.



11
12
13
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 11

def active_at
  @active_at
end

#app_nameObject (readonly)

Returns the value of attribute app_name.



8
9
10
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8

def app_name
  @app_name
end

#clientObject (readonly)

Returns the value of attribute client.



8
9
10
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8

def client
  @client
end

#dynosObject

Returns the value of attribute dynos.



12
13
14
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 12

def dynos
  @dynos
end

#historyObject (readonly)

Returns the value of attribute history.



8
9
10
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8

def history
  @history
end

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8

def name
  @name
end

#queue_systemObject (readonly)

Returns the value of attribute queue_system.



9
10
11
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 9

def queue_system
  @queue_system
end

#quiet_bufferObject

Returns the value of attribute quiet_buffer.



12
13
14
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 12

def quiet_buffer
  @quiet_buffer
end

#quieted_atObject

Returns the value of attribute quieted_at.



11
12
13
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 11

def quieted_at
  @quieted_at
end

#quieted_toObject

Returns the value of attribute quieted_to.



12
13
14
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 12

def quieted_to
  @quieted_to
end

#scale_strategyObject (readonly)

Returns the value of attribute scale_strategy.



9
10
11
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 9

def scale_strategy
  @scale_strategy
end

#throttleObject (readonly)

Returns the value of attribute throttle.



8
9
10
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8

def throttle
  @throttle
end

#updated_atObject

Returns the value of attribute updated_at.



11
12
13
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 11

def updated_at
  @updated_at
end

Instance Method Details

#cache_keyObject



260
261
262
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 260

def cache_key
  [self.class.name.gsub('::', '/').downcase, app_name, name].join(':')
end

#fetch_dyno_countObject

gets a live dyno count from Heroku



184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 184

def fetch_dyno_count
  if @client
    @client.formation.list(app_name)
      .select { |item| item['type'] == name }
      .map { |item| item['quantity'] }
      .reduce(0, &:+)
  else
    @dynos
  end
rescue StandardError => e
  ::Sidekiq::HerokuAutoscale.exception_handler.call(e)
  0
end

#fulfills_quietdown?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 77

def fulfills_quietdown?
  !!(@quieted_at && Time.now.utc >= @quieted_at + @quiet_buffer)
end

#ping!Object

request a throttled update



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 54

def ping!
  @active_at = Time.now.utc
  if ::Sidekiq.server?
    # submit the process for runscaling (up or down)
    # the process is polled until shutdown occurs
    SHUTDOWN_POLL.call(self)
  else
    # submits the process for upscaling (wake up)
    # the process is polled until an update is run
    WAKE_THROTTLE.call(self)
  end
end

#quietdown(to = 0) ⇒ Object

starts a quietdown period in which excess workers are quieted no formation changes are allowed during a quietdown window.



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 93

def quietdown(to=0)
  quiet_to = [0, to].max
  quiet_at = Time.now.utc
  unless queue_system.quietdown!(quiet_to)
    # omit quiet buffer if no workers were actually quieted
    # allows direct downscaling without buffer delay
    # (though uptime buffer may still have an effect)
    quiet_at -= (@quiet_buffer + 1)
  end
  set_attributes(quieted_to: quiet_to, quieted_at: quiet_at)
end

#quieting?Boolean

checks if the system is downscaling no other scaling is allowed during a cooling period

Returns:

  • (Boolean)


69
70
71
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 69

def quieting?
  !!(@quieted_to && @quieted_at)
end

#set_attributes(attrs) ⇒ Object

sets redis-cached process attributes



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 210

def set_attributes(attrs)
  cache = {}
  prev_dynos = @dynos
  if attrs.key?(:dynos)
    cache['dynos'] = @dynos = attrs[:dynos]
  end
  if attrs.key?(:quieted_to)
    cache['quieted_to'] = @quieted_to = attrs[:quieted_to]
  end
  if attrs.key?(:quieted_at)
    @quieted_at = attrs[:quieted_at]
    cache['quieted_at'] = @quieted_at ? @quieted_at.to_i : nil
  end
  if attrs.key?(:updated_at)
    @updated_at = attrs[:updated_at]
    cache['updated_at'] = @updated_at ? @updated_at.to_i : nil
  end

  ::Sidekiq.redis do |c|
    c.pipelined do
      # set new keys, delete expired keys
      del, set = cache.partition { |k, v| v.nil? }
      c.hmset(cache_key, *set.flatten) if set.any?
      c.hdel(cache_key, *del.map(&:first)) if del.any?

      if attrs[:history_at]
        # set a dyno count history marker
        event_time = (attrs[:history_at].to_f / @throttle).floor * @throttle
        history_page = (attrs[:history_at].to_f / @history).floor * @history
        history_key = "#{ cache_key }:#{ history_page }"

        c.hmset(history_key, (event_time - @throttle).to_s, prev_dynos, event_time.to_s, @dynos)
        c.expire(history_key, @history * 2)
      end
    end
  end
end

#set_dyno_count!(count) ⇒ Object

sets the live dyno count on Heroku



199
200
201
202
203
204
205
206
207
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 199

def set_dyno_count!(count)
  ::Sidekiq.logger.info("SCALE to #{ count } dynos")
  @client.formation.update(app_name, name, { quantity: count }) if @client
  set_attributes(dynos: count, quieted_to: nil, quieted_at: nil, history_at: Time.now.utc)
  count
rescue StandardError => e
  ::Sidekiq::HerokuAutoscale.exception_handler.call(e)
  @dynos
end

#shutting_down?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 73

def shutting_down?
  quieting? && @quieted_to.zero?
end

#statusObject



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 41

def status
  if shutting_down?
    'stopping'
  elsif quieting?
    'quieting'
  elsif @dynos > 0
    'running'
  else
    'stopped'
  end
end

#sync_attributesObject

syncs configuration across process instances (dynos)



249
250
251
252
253
254
255
256
257
258
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 249

def sync_attributes
  if cache = ::Sidekiq.redis { |c| c.hgetall(cache_key) }
    @dynos = cache['dynos'] ? cache['dynos'].to_i : 0
    @quieted_to = cache['quieted_to'] ? cache['quieted_to'].to_i : nil
    @quieted_at = cache['quieted_at'] ? Time.at(cache['quieted_at'].to_i).utc : nil
    @updated_at = cache['updated_at'] ? Time.at(cache['updated_at'].to_i).utc : nil
    return true
  end
  false
end

#throttled?Boolean

# check if last update falls within the throttle window

Returns:

  • (Boolean)


87
88
89
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 87

def throttled?
  !!(@updated_at && Time.now.utc < @updated_at + @throttle)
end

#update!(current = nil, target = nil) ⇒ Object

update the process with live dyno count from Heroku, and then reassess workload and scale transitions. this method shouldn’t be called directly… just ping! it.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 139

def update!(current=nil, target=nil)
  current ||= fetch_dyno_count

  attrs = { dynos: current, updated_at: Time.now.utc }
  if current.zero?
    attrs[:quieted_to] = nil
    attrs[:quieted_at] = nil
  end
  set_attributes(attrs)

  # No changes are allowed while quieting...
  # the quieted dyno needs to be removed (downscaled)
  # before making other changes to the formation.
  unless quieting?
    # select a new scale target to shoot for
    # (provides a trajectory, not necessarily a destination)
    target ||= scale_strategy.call(queue_system)

    # idle
    if current == target
      ::Sidekiq.logger.info("IDLE at #{ target } dynos")
      return current

    # upscale
    elsif current < target
      return set_dyno_count!(target)

    # quietdown
    elsif current > target
      ::Sidekiq.logger.info("QUIET to #{ current - 1 } dynos")
      quietdown(current - 1)
      # do NOT return...
      # allows downscale conditions to run during the same update
    end
  end

  # downscale
  if quieting? && fulfills_quietdown?
    return set_dyno_count!(@quieted_to)
  end

  current
end

#updated_since_last_activity?Boolean

check if a probe time is newer than the last update

Returns:

  • (Boolean)


82
83
84
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 82

def updated_since_last_activity?
  !!(@active_at && @updated_at && @updated_at > @active_at)
end

#wait_for_shutdown!Object

wrapper for monitoring the downscale process (server) polling runs until an update returns zero dynos.



127
128
129
130
131
132
133
134
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 127

def wait_for_shutdown!
  return false if throttled?

  sync_attributes
  return false if throttled?

  update!.zero?
end

#wait_for_update!Object

wrapper for throttling the upscale process (client) polling runs until the next update has been called.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 107

def wait_for_update!
  # resolve (true) when already updated by another process
  # keep waiting (false) when:
  # - redundant updates are called within the throttle window
  # - the system has been fully quieted and must shutdown before upscaling
  return true if updated_since_last_activity?
  return false if throttled?

  # first round of checks use local (process-specific) settings
  # now hit the redis cache and double check settings from other processes
  sync_attributes
  return true if updated_since_last_activity?
  return false if throttled?

  update!
  true
end