Class: RSpecQ::Queue
- Inherits:
-
Object
- Object
- RSpecQ::Queue
- Defined in:
- lib/rspecq/queue.rb
Overview
Queue is the data store interface (Redis) and is used to manage the work queue for a particular build. All Redis operations happen via Queue.
A queue typically contains all the data needed for a particular build to happen. These include (but are not limited to) the following:
-
the list of jobs (spec files and/or examples) to be executed
-
the failed examples along with their backtrace
-
the set of running jobs
-
previous job timing statistics used to optimally schedule the jobs
-
the set of executed jobs
Constant Summary collapse
- RESERVE_JOB =
<<~LUA.freeze local queue = KEYS[1] local queue_running = KEYS[2] local worker_id = ARGV[1] local job = redis.call('lpop', queue) if job then redis.call('hset', queue_running, worker_id, job) return job else return nil end LUA
- REQUEUE_LOST_JOB =
Scans for dead workers and puts their reserved jobs back to the queue.
<<~LUA.freeze local worker_heartbeats = KEYS[1] local queue_running = KEYS[2] local queue_unprocessed = KEYS[3] local time_now = ARGV[1] local timeout = ARGV[2] local dead_workers = redis.call('zrangebyscore', worker_heartbeats, 0, time_now - timeout) for _, worker in ipairs(dead_workers) do local job = redis.call('hget', queue_running, worker) if job then redis.call('lpush', queue_unprocessed, job) redis.call('hdel', queue_running, worker) return job end end return nil LUA
- REQUEUE_JOB =
<<~LUA.freeze local key_queue_unprocessed = KEYS[1] local key_requeues = KEYS[2] local key_requeued_job_original_worker = KEYS[3] local key_job_location = KEYS[4] local job = ARGV[1] local max_requeues = ARGV[2] local original_worker = ARGV[3] local location = ARGV[4] local requeued_times = redis.call('hget', key_requeues, job) if requeued_times and requeued_times >= max_requeues then return nil end redis.call('lpush', key_queue_unprocessed, job) redis.call('hset', key_requeued_job_original_worker, job, original_worker) redis.call('hincrby', key_requeues, job, 1) redis.call('hset', key_job_location, job, location) return true LUA
- STATUS_INITIALIZING =
"initializing".freeze
- STATUS_READY =
"ready".freeze
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
-
#acknowledge_job(job) ⇒ Object
NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.
- #become_master ⇒ Object
-
#build_failed_fast? ⇒ Boolean
Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.
- #build_successful? ⇒ Boolean
- #example_count ⇒ Object
- #example_failures ⇒ Object
-
#exhausted? ⇒ Boolean
True if the build is complete, false otherwise.
-
#fail_fast ⇒ Object
Returns the number of failures that will trigger the build to fail-fast.
- #failed_job_worker(job) ⇒ Object
-
#flaky_jobs ⇒ Object
Returns the jobs considered flaky (i.e. initially failed but passed after being retried).
- #increment_example_count(n) ⇒ Object
-
#initialize(build_id, worker_id, redis_opts) ⇒ Queue
constructor
A new instance of Queue.
- #job_location(job) ⇒ Object
- #job_rerun_command(job) ⇒ Object
-
#key_build_times ⇒ Object
redis: LIST<duration>.
-
#key_errors ⇒ Object
Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).
-
#key_example_count ⇒ Object
The total number of examples, those that were requeued.
-
#key_failures ⇒ Object
Contains regular RSpec example failures.
-
#key_queue_config ⇒ Object
redis: HASH<config_key => config_value>.
-
#key_queue_processed ⇒ Object
redis: SET<job>.
-
#key_queue_running ⇒ Object
redis: HASH<worker_id => job>.
-
#key_queue_status ⇒ Object
redis: STRING [STATUS_INITIALIZING, STATUS_READY].
-
#key_queue_unprocessed ⇒ Object
redis: LIST<job>.
-
#key_requeues ⇒ Object
As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.
-
#key_timings ⇒ Object
redis: ZSET<job => duration>.
-
#key_worker_heartbeats ⇒ Object
redis: ZSET<worker_id => timestamp>.
- #non_example_errors ⇒ Object
- #processed_jobs ⇒ Object
- #processed_jobs_count ⇒ Object
-
#publish(jobs, fail_fast = 0) ⇒ Object
NOTE: jobs will be processed from head to tail (lpop).
- #published? ⇒ Boolean
- #record_build_time(duration) ⇒ Object
- #record_example_failure(example_id, message) ⇒ Object
-
#record_non_example_error(job, message) ⇒ Object
For errors occured outside of examples (e.g. while loading a spec file).
- #record_timing(job, duration) ⇒ Object
- #record_worker_heartbeat ⇒ Object
-
#requeue_job(example, max_requeues, original_worker_id) ⇒ Object
Put job at the head of the queue to be re-processed right after, by another worker.
- #requeue_lost_job ⇒ Object
- #requeued_jobs ⇒ Object
- #reserve_job ⇒ Object
- #save_worker_seed(worker, seed) ⇒ Object
-
#timings ⇒ Object
ordered by execution time desc (slowest are in the head).
-
#unprocessed_jobs ⇒ Object
The remaining jobs to be processed.
- #wait_until_published(timeout = 30) ⇒ Object
Constructor Details
#initialize(build_id, worker_id, redis_opts) ⇒ Queue
Returns a new instance of Queue.
79 80 81 82 83 |
# File 'lib/rspecq/queue.rb', line 79 def initialize(build_id, worker_id, redis_opts) @build_id = build_id @worker_id = worker_id @redis = Redis.new(redis_opts.merge(id: worker_id)) end |
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
77 78 79 |
# File 'lib/rspecq/queue.rb', line 77 def redis @redis end |
Instance Method Details
#acknowledge_job(job) ⇒ Object
NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.
122 123 124 125 126 127 128 |
# File 'lib/rspecq/queue.rb', line 122 def acknowledge_job(job) @redis.multi do @redis.hdel(key_queue_running, @worker_id) @redis.sadd(key_queue_processed, job) @redis.rpush(key("queue", "jobs_per_worker", @worker_id), job) end end |
#become_master ⇒ Object
214 215 216 |
# File 'lib/rspecq/queue.rb', line 214 def become_master @redis.setnx(key_queue_status, STATUS_INITIALIZING) end |
#build_failed_fast? ⇒ Boolean
Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.
291 292 293 294 295 296 297 298 299 300 |
# File 'lib/rspecq/queue.rb', line 291 def build_failed_fast? if fail_fast.nil? || fail_fast.zero? return false end @redis.multi do @redis.hlen(key_failures) @redis.hlen(key_errors) end.inject(:+) >= fail_fast end |
#build_successful? ⇒ Boolean
255 256 257 |
# File 'lib/rspecq/queue.rb', line 255 def build_successful? exhausted? && example_failures.empty? && non_example_errors.empty? end |
#example_count ⇒ Object
198 199 200 |
# File 'lib/rspecq/queue.rb', line 198 def example_count @redis.get(key_example_count).to_i end |
#example_failures ⇒ Object
223 224 225 |
# File 'lib/rspecq/queue.rb', line 223 def example_failures @redis.hgetall(key_failures) end |
#exhausted? ⇒ Boolean
True if the build is complete, false otherwise
232 233 234 235 236 237 238 239 |
# File 'lib/rspecq/queue.rb', line 232 def exhausted? return false if !published? @redis.multi do @redis.llen(key_queue_unprocessed) @redis.hlen(key_queue_running) end.inject(:+).zero? end |
#fail_fast ⇒ Object
Returns the number of failures that will trigger the build to fail-fast. Returns 0 if this feature is disabled and nil if the Queue is not yet published
283 284 285 286 287 |
# File 'lib/rspecq/queue.rb', line 283 def fail_fast return nil unless published? @fail_fast ||= Integer(@redis.hget(key_queue_config, "fail_fast")) end |
#failed_job_worker(job) ⇒ Object
156 157 158 |
# File 'lib/rspecq/queue.rb', line 156 def failed_job_worker(job) redis.hget(key("requeued_job_original_worker"), job) end |
#flaky_jobs ⇒ Object
Returns the jobs considered flaky (i.e. initially failed but passed after being retried). Must be called after the build is complete, otherwise an exception will be raised.
268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/rspecq/queue.rb', line 268 def flaky_jobs if !exhausted? && !build_failed_fast? raise "Queue is not yet exhausted" end requeued = @redis.hkeys(key_requeues) return [] if requeued.empty? requeued - @redis.hkeys(key_failures) end |
#increment_example_count(n) ⇒ Object
194 195 196 |
# File 'lib/rspecq/queue.rb', line 194 def increment_example_count(n) @redis.incrby(key_example_count, n) end |
#job_location(job) ⇒ Object
152 153 154 |
# File 'lib/rspecq/queue.rb', line 152 def job_location(job) @redis.hget(key("job_location"), job) end |
#job_rerun_command(job) ⇒ Object
160 161 162 163 164 165 166 167 168 |
# File 'lib/rspecq/queue.rb', line 160 def job_rerun_command(job) worker = failed_job_worker(job) jobs = redis.lrange(key("queue", "jobs_per_worker", worker), 0, -1) seed = redis.hget(key("worker_seed"), worker) "DISABLE_SPRING=1 DISABLE_BOOTSNAP=1 bin/rspecq --build 1 " \ "--worker foo --seed #{seed} --max-requeues 0 --fail-fast 1 " \ "--reproduction #{jobs.join(' ')}" end |
#key_build_times ⇒ Object
redis: LIST<duration>
Last build is at the head of the list.
376 377 378 |
# File 'lib/rspecq/queue.rb', line 376 def key_build_times "build_times" end |
#key_errors ⇒ Object
Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).
redis: HASH<job => error message>
338 339 340 |
# File 'lib/rspecq/queue.rb', line 338 def key_errors key("errors") end |
#key_example_count ⇒ Object
The total number of examples, those that were requeued.
redis: STRING<integer>
353 354 355 |
# File 'lib/rspecq/queue.rb', line 353 def key_example_count key("example_count") end |
#key_failures ⇒ Object
Contains regular RSpec example failures.
redis: HASH<example_id => error message>
330 331 332 |
# File 'lib/rspecq/queue.rb', line 330 def key_failures key("example_failures") end |
#key_queue_config ⇒ Object
redis: HASH<config_key => config_value>
308 309 310 |
# File 'lib/rspecq/queue.rb', line 308 def key_queue_config key("queue", "config") end |
#key_queue_processed ⇒ Object
redis: SET<job>
323 324 325 |
# File 'lib/rspecq/queue.rb', line 323 def key_queue_processed key("queue", "processed") end |
#key_queue_running ⇒ Object
redis: HASH<worker_id => job>
318 319 320 |
# File 'lib/rspecq/queue.rb', line 318 def key_queue_running key("queue", "running") end |
#key_queue_status ⇒ Object
redis: STRING [STATUS_INITIALIZING, STATUS_READY]
303 304 305 |
# File 'lib/rspecq/queue.rb', line 303 def key_queue_status key("queue", "status") end |
#key_queue_unprocessed ⇒ Object
redis: LIST<job>
313 314 315 |
# File 'lib/rspecq/queue.rb', line 313 def key_queue_unprocessed key("queue", "unprocessed") end |
#key_requeues ⇒ Object
As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.
redis: HASH<job => times_retried>
346 347 348 |
# File 'lib/rspecq/queue.rb', line 346 def key_requeues key("requeues") end |
#key_timings ⇒ Object
redis: ZSET<job => duration>
NOTE: This key is not scoped to a build (i.e. shared among all builds), so be careful to only publish timings from a single branch (e.g. master). Otherwise, timings won’t be accurate.
369 370 371 |
# File 'lib/rspecq/queue.rb', line 369 def key_timings "timings" end |
#key_worker_heartbeats ⇒ Object
redis: ZSET<worker_id => timestamp>
Timestamp of the last example processed by each worker.
360 361 362 |
# File 'lib/rspecq/queue.rb', line 360 def key_worker_heartbeats key("worker_heartbeats") end |
#non_example_errors ⇒ Object
227 228 229 |
# File 'lib/rspecq/queue.rb', line 227 def non_example_errors @redis.hgetall(key_errors) end |
#processed_jobs ⇒ Object
206 207 208 |
# File 'lib/rspecq/queue.rb', line 206 def processed_jobs @redis.smembers(key_queue_processed) end |
#processed_jobs_count ⇒ Object
202 203 204 |
# File 'lib/rspecq/queue.rb', line 202 def processed_jobs_count @redis.scard(key_queue_processed) end |
#publish(jobs, fail_fast = 0) ⇒ Object
NOTE: jobs will be processed from head to tail (lpop)
86 87 88 89 90 91 92 |
# File 'lib/rspecq/queue.rb', line 86 def publish(jobs, fail_fast = 0) @redis.multi do @redis.hset(key_queue_config, "fail_fast", fail_fast) @redis.rpush(key_queue_unprocessed, jobs) @redis.set(key_queue_status, STATUS_READY) end.first end |
#published? ⇒ Boolean
241 242 243 |
# File 'lib/rspecq/queue.rb', line 241 def published? @redis.get(key_queue_status) == STATUS_READY end |
#record_build_time(duration) ⇒ Object
183 184 185 186 187 188 |
# File 'lib/rspecq/queue.rb', line 183 def record_build_time(duration) @redis.multi do @redis.lpush(key_build_times, Float(duration)) @redis.ltrim(key_build_times, 0, 99) end end |
#record_example_failure(example_id, message) ⇒ Object
170 171 172 |
# File 'lib/rspecq/queue.rb', line 170 def record_example_failure(example_id, ) @redis.hset(key_failures, example_id, ) end |
#record_non_example_error(job, message) ⇒ Object
For errors occured outside of examples (e.g. while loading a spec file)
175 176 177 |
# File 'lib/rspecq/queue.rb', line 175 def record_non_example_error(job, ) @redis.hset(key_errors, job, ) end |
#record_timing(job, duration) ⇒ Object
179 180 181 |
# File 'lib/rspecq/queue.rb', line 179 def record_timing(job, duration) @redis.zadd(key_timings, duration, job) end |
#record_worker_heartbeat ⇒ Object
190 191 192 |
# File 'lib/rspecq/queue.rb', line 190 def record_worker_heartbeat @redis.zadd(key_worker_heartbeats, current_time, @worker_id) end |
#requeue_job(example, max_requeues, original_worker_id) ⇒ Object
Put job at the head of the queue to be re-processed right after, by another worker. This is a mitigation measure against flaky tests.
Returns nil if the job hit the requeue limit and therefore was not requeued and should be considered a failure.
135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/rspecq/queue.rb', line 135 def requeue_job(example, max_requeues, original_worker_id) return false if max_requeues.zero? job = example.id location = example.location_rerun_argument @redis.eval( REQUEUE_JOB, keys: [key_queue_unprocessed, key_requeues, key("requeued_job_original_worker"), key("job_location")], argv: [job, max_requeues, original_worker_id, location] ) end |
#requeue_lost_job ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/rspecq/queue.rb', line 105 def requeue_lost_job @redis.eval( REQUEUE_LOST_JOB, keys: [ key_worker_heartbeats, key_queue_running, key_queue_unprocessed ], argv: [ current_time, WORKER_LIVENESS_SEC ] ) end |
#requeued_jobs ⇒ Object
210 211 212 |
# File 'lib/rspecq/queue.rb', line 210 def requeued_jobs @redis.hgetall(key_requeues) end |
#reserve_job ⇒ Object
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/rspecq/queue.rb', line 94 def reserve_job @redis.eval( RESERVE_JOB, keys: [ key_queue_unprocessed, key_queue_running, ], argv: [@worker_id] ) end |
#save_worker_seed(worker, seed) ⇒ Object
148 149 150 |
# File 'lib/rspecq/queue.rb', line 148 def save_worker_seed(worker, seed) @redis.hset(key("worker_seed"), worker, seed) end |
#timings ⇒ Object
ordered by execution time desc (slowest are in the head)
219 220 221 |
# File 'lib/rspecq/queue.rb', line 219 def timings Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)] end |
#unprocessed_jobs ⇒ Object
The remaining jobs to be processed. Jobs at the head of the list will be procesed first.
261 262 263 |
# File 'lib/rspecq/queue.rb', line 261 def unprocessed_jobs @redis.lrange(key_queue_unprocessed, 0, -1) end |
#wait_until_published(timeout = 30) ⇒ Object
245 246 247 248 249 250 251 252 253 |
# File 'lib/rspecq/queue.rb', line 245 def wait_until_published(timeout = 30) (timeout * 10).times do return if published? sleep 0.1 end raise "Queue not yet published after #{timeout} seconds" end |