Class: Sidekiq::HerokuAutoscale::Process
- Inherits:
-
Object
- Object
- Sidekiq::HerokuAutoscale::Process
- Defined in:
- lib/sidekiq/heroku_autoscale/process.rb
Constant Summary collapse
- WAKE_THROTTLE =
PollInterval.new(:wait_for_update!, before_update: 2)
- SHUTDOWN_POLL =
PollInterval.new(:wait_for_shutdown!, before_update: 10)
Instance Attribute Summary collapse
-
#active_at ⇒ Object
Returns the value of attribute active_at.
-
#app_name ⇒ Object
readonly
Returns the value of attribute app_name.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#dynos ⇒ Object
Returns the value of attribute dynos.
-
#history ⇒ Object
readonly
Returns the value of attribute history.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue_system ⇒ Object
readonly
Returns the value of attribute queue_system.
-
#quiet_buffer ⇒ Object
Returns the value of attribute quiet_buffer.
-
#quieted_at ⇒ Object
Returns the value of attribute quieted_at.
-
#quieted_to ⇒ Object
Returns the value of attribute quieted_to.
-
#scale_strategy ⇒ Object
readonly
Returns the value of attribute scale_strategy.
-
#throttle ⇒ Object
readonly
Returns the value of attribute throttle.
-
#updated_at ⇒ Object
Returns the value of attribute updated_at.
Instance Method Summary collapse
- #cache_key ⇒ Object
-
#fetch_dyno_count ⇒ Object
gets a live dyno count from Heroku.
- #fulfills_quietdown? ⇒ Boolean
-
#initialize(name: 'worker', app_name: nil, client: nil, throttle: 10, history: 3600, quiet_buffer: 10, system: {}, scale: {}) ⇒ Process
constructor
A new instance of Process.
-
#ping! ⇒ Object
request a throttled update.
-
#quietdown(to = 0) ⇒ Object
starts a quietdown period in which excess workers are quieted no formation changes are allowed during a quietdown window.
-
#quieting? ⇒ Boolean
checks if the system is downscaling no other scaling is allowed during a cooling period.
-
#set_attributes(attrs) ⇒ Object
sets redis-cached process attributes.
-
#set_dyno_count!(count) ⇒ Object
sets the live dyno count on Heroku.
- #shutting_down? ⇒ Boolean
- #status ⇒ Object
-
#sync_attributes ⇒ Object
syncs configuration across process instances (dynos).
-
#throttled? ⇒ Boolean
# check if last update falls within the throttle window.
-
#update!(current = nil, target = nil) ⇒ Object
update the process with live dyno count from Heroku, and then reassess workload and scale transitions.
-
#updated_since_last_activity? ⇒ Boolean
check if a probe time is newer than the last update.
-
#wait_for_shutdown! ⇒ Object
wrapper for monitoring the downscale process (server) polling runs until an update returns zero dynos.
-
#wait_for_update! ⇒ Object
wrapper for throttling the upscale process (client) polling runs until the next update has been called.
Constructor Details
#initialize(name: 'worker', app_name: nil, client: nil, throttle: 10, history: 3600, quiet_buffer: 10, system: {}, scale: {}) ⇒ Process
Returns a new instance of Process.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 14 def initialize( name: 'worker', app_name: nil, client: nil, throttle: 10, # 10 seconds history: 3600, # 1 hour quiet_buffer: 10, system: {}, scale: {} ) @app_name = app_name || name.to_s @name = name.to_s @client = client @queue_system = QueueSystem.new(system) @scale_strategy = ScaleStrategy.new(scale) @dynos = 0 @active_at = nil @updated_at = nil @quieted_at = nil @quieted_to = nil @throttle = throttle @history = history @quiet_buffer = quiet_buffer end |
Instance Attribute Details
#active_at ⇒ Object
Returns the value of attribute active_at.
11 12 13 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 11 def active_at @active_at end |
#app_name ⇒ Object (readonly)
Returns the value of attribute app_name.
8 9 10 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8 def app_name @app_name end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
8 9 10 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8 def client @client end |
#dynos ⇒ Object
Returns the value of attribute dynos.
12 13 14 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 12 def dynos @dynos end |
#history ⇒ Object (readonly)
Returns the value of attribute history.
8 9 10 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8 def history @history end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
8 9 10 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8 def name @name end |
#queue_system ⇒ Object (readonly)
Returns the value of attribute queue_system.
9 10 11 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 9 def queue_system @queue_system end |
#quiet_buffer ⇒ Object
Returns the value of attribute quiet_buffer.
12 13 14 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 12 def quiet_buffer @quiet_buffer end |
#quieted_at ⇒ Object
Returns the value of attribute quieted_at.
11 12 13 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 11 def quieted_at @quieted_at end |
#quieted_to ⇒ Object
Returns the value of attribute quieted_to.
12 13 14 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 12 def quieted_to @quieted_to end |
#scale_strategy ⇒ Object (readonly)
Returns the value of attribute scale_strategy.
9 10 11 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 9 def scale_strategy @scale_strategy end |
#throttle ⇒ Object (readonly)
Returns the value of attribute throttle.
8 9 10 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 8 def throttle @throttle end |
#updated_at ⇒ Object
Returns the value of attribute updated_at.
11 12 13 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 11 def updated_at @updated_at end |
Instance Method Details
#cache_key ⇒ Object
260 261 262 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 260 def cache_key [self.class.name.gsub('::', '/').downcase, app_name, name].join(':') end |
#fetch_dyno_count ⇒ Object
gets a live dyno count from Heroku
184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 184 def fetch_dyno_count if @client @client.formation.list(app_name) .select { |item| item['type'] == name } .map { |item| item['quantity'] } .reduce(0, &:+) else @dynos end rescue StandardError => e ::Sidekiq::HerokuAutoscale.exception_handler.call(e) 0 end |
#fulfills_quietdown? ⇒ Boolean
77 78 79 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 77 def fulfills_quietdown? !!(@quieted_at && Time.now.utc >= @quieted_at + @quiet_buffer) end |
#ping! ⇒ Object
request a throttled update
54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 54 def ping! @active_at = Time.now.utc if ::Sidekiq.server? # submit the process for runscaling (up or down) # the process is polled until shutdown occurs SHUTDOWN_POLL.call(self) else # submits the process for upscaling (wake up) # the process is polled until an update is run WAKE_THROTTLE.call(self) end end |
#quietdown(to = 0) ⇒ Object
starts a quietdown period in which excess workers are quieted no formation changes are allowed during a quietdown window.
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 93 def quietdown(to=0) quiet_to = [0, to].max quiet_at = Time.now.utc unless queue_system.quietdown!(quiet_to) # omit quiet buffer if no workers were actually quieted # allows direct downscaling without buffer delay # (though uptime buffer may still have an effect) quiet_at -= (@quiet_buffer + 1) end set_attributes(quieted_to: quiet_to, quieted_at: quiet_at) end |
#quieting? ⇒ Boolean
checks if the system is downscaling no other scaling is allowed during a cooling period
69 70 71 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 69 def quieting? !!(@quieted_to && @quieted_at) end |
#set_attributes(attrs) ⇒ Object
sets redis-cached process attributes
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 210 def set_attributes(attrs) cache = {} prev_dynos = @dynos if attrs.key?(:dynos) cache['dynos'] = @dynos = attrs[:dynos] end if attrs.key?(:quieted_to) cache['quieted_to'] = @quieted_to = attrs[:quieted_to] end if attrs.key?(:quieted_at) @quieted_at = attrs[:quieted_at] cache['quieted_at'] = @quieted_at ? @quieted_at.to_i : nil end if attrs.key?(:updated_at) @updated_at = attrs[:updated_at] cache['updated_at'] = @updated_at ? @updated_at.to_i : nil end ::Sidekiq.redis do |c| c.pipelined do # set new keys, delete expired keys del, set = cache.partition { |k, v| v.nil? } c.hmset(cache_key, *set.flatten) if set.any? c.hdel(cache_key, *del.map(&:first)) if del.any? if attrs[:history_at] # set a dyno count history marker event_time = (attrs[:history_at].to_f / @throttle).floor * @throttle history_page = (attrs[:history_at].to_f / @history).floor * @history history_key = "#{ cache_key }:#{ history_page }" c.hmset(history_key, (event_time - @throttle).to_s, prev_dynos, event_time.to_s, @dynos) c.expire(history_key, @history * 2) end end end end |
#set_dyno_count!(count) ⇒ Object
sets the live dyno count on Heroku
199 200 201 202 203 204 205 206 207 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 199 def set_dyno_count!(count) ::Sidekiq.logger.info("SCALE to #{ count } dynos") @client.formation.update(app_name, name, { quantity: count }) if @client set_attributes(dynos: count, quieted_to: nil, quieted_at: nil, history_at: Time.now.utc) count rescue StandardError => e ::Sidekiq::HerokuAutoscale.exception_handler.call(e) @dynos end |
#shutting_down? ⇒ Boolean
73 74 75 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 73 def shutting_down? quieting? && @quieted_to.zero? end |
#status ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 41 def status if shutting_down? 'stopping' elsif quieting? 'quieting' elsif @dynos > 0 'running' else 'stopped' end end |
#sync_attributes ⇒ Object
syncs configuration across process instances (dynos)
249 250 251 252 253 254 255 256 257 258 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 249 def sync_attributes if cache = ::Sidekiq.redis { |c| c.hgetall(cache_key) } @dynos = cache['dynos'] ? cache['dynos'].to_i : 0 @quieted_to = cache['quieted_to'] ? cache['quieted_to'].to_i : nil @quieted_at = cache['quieted_at'] ? Time.at(cache['quieted_at'].to_i).utc : nil @updated_at = cache['updated_at'] ? Time.at(cache['updated_at'].to_i).utc : nil return true end false end |
#throttled? ⇒ Boolean
# check if last update falls within the throttle window
87 88 89 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 87 def throttled? !!(@updated_at && Time.now.utc < @updated_at + @throttle) end |
#update!(current = nil, target = nil) ⇒ Object
update the process with live dyno count from Heroku, and then reassess workload and scale transitions. this method shouldn’t be called directly… just ping! it.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 139 def update!(current=nil, target=nil) current ||= fetch_dyno_count attrs = { dynos: current, updated_at: Time.now.utc } if current.zero? attrs[:quieted_to] = nil attrs[:quieted_at] = nil end set_attributes(attrs) # No changes are allowed while quieting... # the quieted dyno needs to be removed (downscaled) # before making other changes to the formation. unless quieting? # select a new scale target to shoot for # (provides a trajectory, not necessarily a destination) target ||= scale_strategy.call(queue_system) # idle if current == target ::Sidekiq.logger.info("IDLE at #{ target } dynos") return current # upscale elsif current < target return set_dyno_count!(target) # quietdown elsif current > target ::Sidekiq.logger.info("QUIET to #{ current - 1 } dynos") quietdown(current - 1) # do NOT return... # allows downscale conditions to run during the same update end end # downscale if quieting? && fulfills_quietdown? return set_dyno_count!(@quieted_to) end current end |
#updated_since_last_activity? ⇒ Boolean
check if a probe time is newer than the last update
82 83 84 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 82 def updated_since_last_activity? !!(@active_at && @updated_at && @updated_at > @active_at) end |
#wait_for_shutdown! ⇒ Object
wrapper for monitoring the downscale process (server) polling runs until an update returns zero dynos.
127 128 129 130 131 132 133 134 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 127 def wait_for_shutdown! return false if throttled? sync_attributes return false if throttled? update!.zero? end |
#wait_for_update! ⇒ Object
wrapper for throttling the upscale process (client) polling runs until the next update has been called.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/sidekiq/heroku_autoscale/process.rb', line 107 def wait_for_update! # resolve (true) when already updated by another process # keep waiting (false) when: # - redundant updates are called within the throttle window # - the system has been fully quieted and must shutdown before upscaling return true if updated_since_last_activity? return false if throttled? # first round of checks use local (process-specific) settings # now hit the redis cache and double check settings from other processes sync_attributes return true if updated_since_last_activity? return false if throttled? update! true end |