Class: RSpecQ::Queue
- Inherits:
-
Object
- Object
- RSpecQ::Queue
- Defined in:
- lib/rspecq/queue.rb
Overview
Queue is the data store interface (Redis) and is used to manage the work queue for a particular build. All Redis operations happen via Queue.
A queue typically contains all the data needed for a particular build to happen. These include (but are not limited to) the following:
-
the list of jobs (spec files and/or examples) to be executed
-
the failed examples along with their backtrace
-
the set of running jobs
-
previous job timing statistics used to optimally schedule the jobs
-
the set of executed jobs
Constant Summary collapse
- RESERVE_JOB =
"local queue = KEYS[1]\nlocal queue_running = KEYS[2]\nlocal worker_id = ARGV[1]\n\nlocal job = redis.call('lpop', queue)\nif job then\n redis.call('hset', queue_running, worker_id, job)\n return job\nelse\n return nil\nend\n".freeze
- REQUEUE_LOST_JOB =
Scans for dead workers and puts their reserved jobs back to the queue.
"local worker_heartbeats = KEYS[1]\nlocal queue_running = KEYS[2]\nlocal queue_unprocessed = KEYS[3]\nlocal time_now = ARGV[1]\nlocal timeout = ARGV[2]\n\nlocal dead_workers = redis.call('zrangebyscore', worker_heartbeats, 0, time_now - timeout)\nfor _, worker in ipairs(dead_workers) do\n local job = redis.call('hget', queue_running, worker)\n if job then\n redis.call('lpush', queue_unprocessed, job)\n redis.call('hdel', queue_running, worker)\n return job\n end\nend\n\nreturn nil\n".freeze
- REQUEUE_JOB =
"local key_queue_unprocessed = KEYS[1]\nlocal key_requeues = KEYS[2]\nlocal job = ARGV[1]\nlocal max_requeues = ARGV[2]\n\nlocal requeued_times = redis.call('hget', key_requeues, job)\nif requeued_times and requeued_times >= max_requeues then\n return nil\nend\n\nredis.call('lpush', key_queue_unprocessed, job)\nredis.call('hincrby', key_requeues, job, 1)\n\nreturn true\n".freeze
- STATUS_INITIALIZING =
"initializing".freeze
- STATUS_READY =
"ready".freeze
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
-
#acknowledge_job(job) ⇒ Object
NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.
- #become_master ⇒ Object
-
#build_failed_fast? ⇒ Boolean
Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.
- #build_successful? ⇒ Boolean
- #example_count ⇒ Object
- #example_failures ⇒ Object
-
#exhausted? ⇒ Boolean
True if the build is complete, false otherwise.
-
#fail_fast ⇒ Object
Returns the number of failures that will trigger the build to fail-fast.
-
#flaky_jobs ⇒ Object
Returns the jobs considered flaky (i.e. initially failed but passed after being retried).
- #increment_example_count(n) ⇒ Object
-
#initialize(build_id, worker_id, redis_opts) ⇒ Queue
constructor
A new instance of Queue.
-
#key_build_times ⇒ Object
redis: LIST<duration>.
-
#key_errors ⇒ Object
Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).
-
#key_example_count ⇒ Object
The total number of examples, those that were requeued.
-
#key_failures ⇒ Object
Contains regular RSpec example failures.
-
#key_queue_config ⇒ Object
redis: HASH<config_key => config_value>.
-
#key_queue_processed ⇒ Object
redis: SET<job>.
-
#key_queue_running ⇒ Object
redis: HASH<worker_id => job>.
-
#key_queue_status ⇒ Object
redis: STRING [STATUS_INITIALIZING, STATUS_READY].
-
#key_queue_unprocessed ⇒ Object
redis: LIST<job>.
-
#key_requeues ⇒ Object
As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.
-
#key_timings ⇒ Object
redis: ZSET<job => duration>.
-
#key_worker_heartbeats ⇒ Object
redis: ZSET<worker_id => timestamp>.
- #non_example_errors ⇒ Object
- #processed_jobs ⇒ Object
- #processed_jobs_count ⇒ Object
-
#publish(jobs, fail_fast = 0) ⇒ Object
NOTE: jobs will be processed from head to tail (lpop).
- #published? ⇒ Boolean
- #record_build_time(duration) ⇒ Object
- #record_example_failure(example_id, message) ⇒ Object
-
#record_non_example_error(job, message) ⇒ Object
For errors occured outside of examples (e.g. while loading a spec file).
- #record_timing(job, duration) ⇒ Object
- #record_worker_heartbeat ⇒ Object
-
#requeue_job(job, max_requeues) ⇒ Object
Put job at the head of the queue to be re-processed right after, by another worker.
- #requeue_lost_job ⇒ Object
- #requeued_jobs ⇒ Object
- #rerun_command(job) ⇒ Object
- #reserve_job ⇒ Object
- #save_rerun_command(job, cmd) ⇒ Object
-
#timings ⇒ Object
ordered by execution time desc (slowest are in the head).
-
#unprocessed_jobs ⇒ Object
The remaining jobs to be processed.
- #wait_until_published(timeout = 30) ⇒ Object
Constructor Details
#initialize(build_id, worker_id, redis_opts) ⇒ Queue
Returns a new instance of Queue.
73 74 75 76 77 |
# File 'lib/rspecq/queue.rb', line 73 def initialize(build_id, worker_id, redis_opts) @build_id = build_id @worker_id = worker_id @redis = Redis.new(redis_opts.merge(id: worker_id)) end |
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
71 72 73 |
# File 'lib/rspecq/queue.rb', line 71 def redis @redis end |
Instance Method Details
#acknowledge_job(job) ⇒ Object
NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.
116 117 118 119 120 121 |
# File 'lib/rspecq/queue.rb', line 116 def acknowledge_job(job) @redis.multi do @redis.hdel(key_queue_running, @worker_id) @redis.sadd(key_queue_processed, job) end end |
#become_master ⇒ Object
190 191 192 |
# File 'lib/rspecq/queue.rb', line 190 def become_master @redis.setnx(key_queue_status, STATUS_INITIALIZING) end |
#build_failed_fast? ⇒ Boolean
Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.
267 268 269 270 271 272 273 274 275 276 |
# File 'lib/rspecq/queue.rb', line 267 def build_failed_fast? if fail_fast.nil? || fail_fast.zero? return false end @redis.multi do @redis.hlen(key_failures) @redis.hlen(key_errors) end.inject(:+) >= fail_fast end |
#build_successful? ⇒ Boolean
231 232 233 |
# File 'lib/rspecq/queue.rb', line 231 def build_successful? exhausted? && example_failures.empty? && non_example_errors.empty? end |
#example_count ⇒ Object
174 175 176 |
# File 'lib/rspecq/queue.rb', line 174 def example_count @redis.get(key_example_count).to_i end |
#example_failures ⇒ Object
199 200 201 |
# File 'lib/rspecq/queue.rb', line 199 def example_failures @redis.hgetall(key_failures) end |
#exhausted? ⇒ Boolean
True if the build is complete, false otherwise
208 209 210 211 212 213 214 215 |
# File 'lib/rspecq/queue.rb', line 208 def exhausted? return false if !published? @redis.multi do @redis.llen(key_queue_unprocessed) @redis.hlen(key_queue_running) end.inject(:+).zero? end |
#fail_fast ⇒ Object
Returns the number of failures that will trigger the build to fail-fast. Returns 0 if this feature is disabled and nil if the Queue is not yet published
259 260 261 262 263 |
# File 'lib/rspecq/queue.rb', line 259 def fail_fast return nil unless published? @fail_fast ||= Integer(@redis.hget(key_queue_config, "fail_fast")) end |
#flaky_jobs ⇒ Object
Returns the jobs considered flaky (i.e. initially failed but passed after being retried). Must be called after the build is complete, otherwise an exception will be raised.
244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/rspecq/queue.rb', line 244 def flaky_jobs if !exhausted? && !build_failed_fast? raise "Queue is not yet exhausted" end requeued = @redis.hkeys(key_requeues) return [] if requeued.empty? requeued - @redis.hkeys(key_failures) end |
#increment_example_count(n) ⇒ Object
170 171 172 |
# File 'lib/rspecq/queue.rb', line 170 def increment_example_count(n) @redis.incrby(key_example_count, n) end |
#key_build_times ⇒ Object
redis: LIST<duration>
Last build is at the head of the list.
352 353 354 |
# File 'lib/rspecq/queue.rb', line 352 def key_build_times "build_times" end |
#key_errors ⇒ Object
Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).
redis: HASH<job => error message>
314 315 316 |
# File 'lib/rspecq/queue.rb', line 314 def key_errors key("errors") end |
#key_example_count ⇒ Object
The total number of examples, those that were requeued.
redis: STRING<integer>
329 330 331 |
# File 'lib/rspecq/queue.rb', line 329 def key_example_count key("example_count") end |
#key_failures ⇒ Object
Contains regular RSpec example failures.
redis: HASH<example_id => error message>
306 307 308 |
# File 'lib/rspecq/queue.rb', line 306 def key_failures key("example_failures") end |
#key_queue_config ⇒ Object
redis: HASH<config_key => config_value>
284 285 286 |
# File 'lib/rspecq/queue.rb', line 284 def key_queue_config key("queue", "config") end |
#key_queue_processed ⇒ Object
redis: SET<job>
299 300 301 |
# File 'lib/rspecq/queue.rb', line 299 def key_queue_processed key("queue", "processed") end |
#key_queue_running ⇒ Object
redis: HASH<worker_id => job>
294 295 296 |
# File 'lib/rspecq/queue.rb', line 294 def key_queue_running key("queue", "running") end |
#key_queue_status ⇒ Object
redis: STRING [STATUS_INITIALIZING, STATUS_READY]
279 280 281 |
# File 'lib/rspecq/queue.rb', line 279 def key_queue_status key("queue", "status") end |
#key_queue_unprocessed ⇒ Object
redis: LIST<job>
289 290 291 |
# File 'lib/rspecq/queue.rb', line 289 def key_queue_unprocessed key("queue", "unprocessed") end |
#key_requeues ⇒ Object
As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.
redis: HASH<job => times_retried>
322 323 324 |
# File 'lib/rspecq/queue.rb', line 322 def key_requeues key("requeues") end |
#key_timings ⇒ Object
redis: ZSET<job => duration>
NOTE: This key is not scoped to a build (i.e. shared among all builds), so be careful to only publish timings from a single branch (e.g. master). Otherwise, timings won’t be accurate.
345 346 347 |
# File 'lib/rspecq/queue.rb', line 345 def key_timings "timings" end |
#key_worker_heartbeats ⇒ Object
redis: ZSET<worker_id => timestamp>
Timestamp of the last example processed by each worker.
336 337 338 |
# File 'lib/rspecq/queue.rb', line 336 def key_worker_heartbeats key("worker_heartbeats") end |
#non_example_errors ⇒ Object
203 204 205 |
# File 'lib/rspecq/queue.rb', line 203 def non_example_errors @redis.hgetall(key_errors) end |
#processed_jobs ⇒ Object
182 183 184 |
# File 'lib/rspecq/queue.rb', line 182 def processed_jobs @redis.smembers(key_queue_processed) end |
#processed_jobs_count ⇒ Object
178 179 180 |
# File 'lib/rspecq/queue.rb', line 178 def processed_jobs_count @redis.scard(key_queue_processed) end |
#publish(jobs, fail_fast = 0) ⇒ Object
NOTE: jobs will be processed from head to tail (lpop)
80 81 82 83 84 85 86 |
# File 'lib/rspecq/queue.rb', line 80 def publish(jobs, fail_fast = 0) @redis.multi do @redis.hset(key_queue_config, "fail_fast", fail_fast) @redis.rpush(key_queue_unprocessed, jobs) @redis.set(key_queue_status, STATUS_READY) end.first end |
#published? ⇒ Boolean
217 218 219 |
# File 'lib/rspecq/queue.rb', line 217 def published? @redis.get(key_queue_status) == STATUS_READY end |
#record_build_time(duration) ⇒ Object
159 160 161 162 163 164 |
# File 'lib/rspecq/queue.rb', line 159 def record_build_time(duration) @redis.multi do @redis.lpush(key_build_times, Float(duration)) @redis.ltrim(key_build_times, 0, 99) end end |
#record_example_failure(example_id, message) ⇒ Object
146 147 148 |
# File 'lib/rspecq/queue.rb', line 146 def record_example_failure(example_id, ) @redis.hset(key_failures, example_id, ) end |
#record_non_example_error(job, message) ⇒ Object
For errors occured outside of examples (e.g. while loading a spec file)
151 152 153 |
# File 'lib/rspecq/queue.rb', line 151 def record_non_example_error(job, ) @redis.hset(key_errors, job, ) end |
#record_timing(job, duration) ⇒ Object
155 156 157 |
# File 'lib/rspecq/queue.rb', line 155 def record_timing(job, duration) @redis.zadd(key_timings, duration, job) end |
#record_worker_heartbeat ⇒ Object
166 167 168 |
# File 'lib/rspecq/queue.rb', line 166 def record_worker_heartbeat @redis.zadd(key_worker_heartbeats, current_time, @worker_id) end |
#requeue_job(job, max_requeues) ⇒ Object
Put job at the head of the queue to be re-processed right after, by another worker. This is a mitigation measure against flaky tests.
Returns nil if the job hit the requeue limit and therefore was not requeued and should be considered a failure.
128 129 130 131 132 133 134 135 136 |
# File 'lib/rspecq/queue.rb', line 128 def requeue_job(job, max_requeues) return false if max_requeues.zero? @redis.eval( REQUEUE_JOB, keys: [key_queue_unprocessed, key_requeues], argv: [job, max_requeues] ) end |
#requeue_lost_job ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/rspecq/queue.rb', line 99 def requeue_lost_job @redis.eval( REQUEUE_LOST_JOB, keys: [ key_worker_heartbeats, key_queue_running, key_queue_unprocessed ], argv: [ current_time, WORKER_LIVENESS_SEC ] ) end |
#requeued_jobs ⇒ Object
186 187 188 |
# File 'lib/rspecq/queue.rb', line 186 def requeued_jobs @redis.hgetall(key_requeues) end |
#rerun_command(job) ⇒ Object
142 143 144 |
# File 'lib/rspecq/queue.rb', line 142 def rerun_command(job) @redis.hget(key("job_metadata"), job) end |
#reserve_job ⇒ Object
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/rspecq/queue.rb', line 88 def reserve_job @redis.eval( RESERVE_JOB, keys: [ key_queue_unprocessed, key_queue_running, ], argv: [@worker_id] ) end |
#save_rerun_command(job, cmd) ⇒ Object
138 139 140 |
# File 'lib/rspecq/queue.rb', line 138 def save_rerun_command(job, cmd) @redis.hset(key("job_metadata"), job, cmd) end |
#timings ⇒ Object
ordered by execution time desc (slowest are in the head)
195 196 197 |
# File 'lib/rspecq/queue.rb', line 195 def timings Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)] end |
#unprocessed_jobs ⇒ Object
The remaining jobs to be processed. Jobs at the head of the list will be procesed first.
237 238 239 |
# File 'lib/rspecq/queue.rb', line 237 def unprocessed_jobs @redis.lrange(key_queue_unprocessed, 0, -1) end |
#wait_until_published(timeout = 30) ⇒ Object
221 222 223 224 225 226 227 228 229 |
# File 'lib/rspecq/queue.rb', line 221 def wait_until_published(timeout = 30) (timeout * 10).times do return if published? sleep 0.1 end raise "Queue not yet published after #{timeout} seconds" end |