Class: RSpecQ::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rspecq/queue.rb

Overview

Queue is the data store interface (Redis) and is used to manage the work queue for a particular build. All Redis operations happen via Queue.

A queue typically contains all the data needed for a particular build to happen. These include (but are not limited to) the following:

  • the list of jobs (spec files and/or examples) to be executed

  • the failed examples along with their backtrace

  • the set of running jobs

  • previous job timing statistics used to optimally schedule the jobs

  • the set of executed jobs

Constant Summary collapse

RESERVE_JOB =
<<~LUA.freeze
  local queue = KEYS[1]
  local queue_running = KEYS[2]
  local worker_id = ARGV[1]

  local job = redis.call('lpop', queue)
  if job then
    redis.call('hset', queue_running, worker_id, job)
    return job
  else
    return nil
  end
LUA
REQUEUE_LOST_JOB =

Scans for dead workers and puts their reserved jobs back to the queue.

<<~LUA.freeze
  local worker_heartbeats = KEYS[1]
  local queue_running = KEYS[2]
  local queue_unprocessed = KEYS[3]
  local time_now = ARGV[1]
  local timeout = ARGV[2]

  local dead_workers = redis.call('zrangebyscore', worker_heartbeats, 0, time_now - timeout)
  for _, worker in ipairs(dead_workers) do
    local job = redis.call('hget', queue_running, worker)
    if job then
      redis.call('lpush', queue_unprocessed, job)
      redis.call('hdel', queue_running, worker)
      return job
    end
  end

  return nil
LUA
REQUEUE_JOB =
<<~LUA.freeze
  local key_queue_unprocessed = KEYS[1]
  local key_requeues = KEYS[2]
  local key_requeued_job_original_worker = KEYS[3]
  local key_job_location = KEYS[4]
  local job = ARGV[1]
  local max_requeues = ARGV[2]
  local original_worker = ARGV[3]
  local location = ARGV[4]

  local requeued_times = redis.call('hget', key_requeues, job)
  if requeued_times and requeued_times >= max_requeues then
    return nil
  end

  redis.call('lpush', key_queue_unprocessed, job)
  redis.call('hset', key_requeued_job_original_worker, job, original_worker)
  redis.call('hincrby', key_requeues, job, 1)
  redis.call('hset', key_job_location, job, location)

  return true
LUA
STATUS_INITIALIZING =
"initializing".freeze
STATUS_READY =
"ready".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(build_id, worker_id, redis_opts) ⇒ Queue

Returns a new instance of Queue.



79
80
81
82
83
# File 'lib/rspecq/queue.rb', line 79

def initialize(build_id, worker_id, redis_opts)
  @build_id = build_id
  @worker_id = worker_id
  @redis = Redis.new(redis_opts.merge(id: worker_id))
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



77
78
79
# File 'lib/rspecq/queue.rb', line 77

def redis
  @redis
end

Instance Method Details

#acknowledge_job(job) ⇒ Object

NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.



122
123
124
125
126
127
128
# File 'lib/rspecq/queue.rb', line 122

def acknowledge_job(job)
  @redis.multi do
    @redis.hdel(key_queue_running, @worker_id)
    @redis.sadd(key_queue_processed, job)
    @redis.rpush(key("queue", "jobs_per_worker", @worker_id), job)
  end
end

#become_masterObject



214
215
216
# File 'lib/rspecq/queue.rb', line 214

def become_master
  @redis.setnx(key_queue_status, STATUS_INITIALIZING)
end

#build_failed_fast?Boolean

Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.

Returns:

  • (Boolean)


291
292
293
294
295
296
297
298
299
300
# File 'lib/rspecq/queue.rb', line 291

def build_failed_fast?
  if fail_fast.nil? || fail_fast.zero?
    return false
  end

  @redis.multi do
    @redis.hlen(key_failures)
    @redis.hlen(key_errors)
  end.inject(:+) >= fail_fast
end

#build_successful?Boolean

Returns:

  • (Boolean)


255
256
257
# File 'lib/rspecq/queue.rb', line 255

def build_successful?
  exhausted? && example_failures.empty? && non_example_errors.empty?
end

#example_countObject



198
199
200
# File 'lib/rspecq/queue.rb', line 198

def example_count
  @redis.get(key_example_count).to_i
end

#example_failuresObject



223
224
225
# File 'lib/rspecq/queue.rb', line 223

def example_failures
  @redis.hgetall(key_failures)
end

#exhausted?Boolean

True if the build is complete, false otherwise

Returns:

  • (Boolean)


232
233
234
235
236
237
238
239
# File 'lib/rspecq/queue.rb', line 232

def exhausted?
  return false if !published?

  @redis.multi do
    @redis.llen(key_queue_unprocessed)
    @redis.hlen(key_queue_running)
  end.inject(:+).zero?
end

#fail_fastObject

Returns the number of failures that will trigger the build to fail-fast. Returns 0 if this feature is disabled and nil if the Queue is not yet published



283
284
285
286
287
# File 'lib/rspecq/queue.rb', line 283

def fail_fast
  return nil unless published?

  @fail_fast ||= Integer(@redis.hget(key_queue_config, "fail_fast"))
end

#failed_job_worker(job) ⇒ Object



156
157
158
# File 'lib/rspecq/queue.rb', line 156

def failed_job_worker(job)
  redis.hget(key("requeued_job_original_worker"), job)
end

#flaky_jobsObject

Returns the jobs considered flaky (i.e. initially failed but passed after being retried). Must be called after the build is complete, otherwise an exception will be raised.



268
269
270
271
272
273
274
275
276
277
278
# File 'lib/rspecq/queue.rb', line 268

def flaky_jobs
  if !exhausted? && !build_failed_fast?
    raise "Queue is not yet exhausted"
  end

  requeued = @redis.hkeys(key_requeues)

  return [] if requeued.empty?

  requeued - @redis.hkeys(key_failures)
end

#increment_example_count(n) ⇒ Object



194
195
196
# File 'lib/rspecq/queue.rb', line 194

def increment_example_count(n)
  @redis.incrby(key_example_count, n)
end

#job_location(job) ⇒ Object



152
153
154
# File 'lib/rspecq/queue.rb', line 152

def job_location(job)
  @redis.hget(key("job_location"), job)
end

#job_rerun_command(job) ⇒ Object



160
161
162
163
164
165
166
167
168
# File 'lib/rspecq/queue.rb', line 160

def job_rerun_command(job)
  worker = failed_job_worker(job)
  jobs = redis.lrange(key("queue", "jobs_per_worker", worker), 0, -1)
  seed = redis.hget(key("worker_seed"), worker)

  "DISABLE_SPRING=1 DISABLE_BOOTSNAP=1 bin/rspecq --build 1 " \
    "--worker foo --seed #{seed} --max-requeues 0 --fail-fast 1 " \
    "--reproduction #{jobs.join(' ')}"
end

#key_build_timesObject

redis: LIST<duration>

Last build is at the head of the list.



376
377
378
# File 'lib/rspecq/queue.rb', line 376

def key_build_times
  "build_times"
end

#key_errorsObject

Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).

redis: HASH<job => error message>



338
339
340
# File 'lib/rspecq/queue.rb', line 338

def key_errors
  key("errors")
end

#key_example_countObject

The total number of examples, those that were requeued.

redis: STRING<integer>



353
354
355
# File 'lib/rspecq/queue.rb', line 353

def key_example_count
  key("example_count")
end

#key_failuresObject

Contains regular RSpec example failures.

redis: HASH<example_id => error message>



330
331
332
# File 'lib/rspecq/queue.rb', line 330

def key_failures
  key("example_failures")
end

#key_queue_configObject

redis: HASH<config_key => config_value>



308
309
310
# File 'lib/rspecq/queue.rb', line 308

def key_queue_config
  key("queue", "config")
end

#key_queue_processedObject

redis: SET<job>



323
324
325
# File 'lib/rspecq/queue.rb', line 323

def key_queue_processed
  key("queue", "processed")
end

#key_queue_runningObject

redis: HASH<worker_id => job>



318
319
320
# File 'lib/rspecq/queue.rb', line 318

def key_queue_running
  key("queue", "running")
end

#key_queue_statusObject

redis: STRING [STATUS_INITIALIZING, STATUS_READY]



303
304
305
# File 'lib/rspecq/queue.rb', line 303

def key_queue_status
  key("queue", "status")
end

#key_queue_unprocessedObject

redis: LIST<job>



313
314
315
# File 'lib/rspecq/queue.rb', line 313

def key_queue_unprocessed
  key("queue", "unprocessed")
end

#key_requeuesObject

As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.

redis: HASH<job => times_retried>



346
347
348
# File 'lib/rspecq/queue.rb', line 346

def key_requeues
  key("requeues")
end

#key_timingsObject

redis: ZSET<job => duration>

NOTE: This key is not scoped to a build (i.e. shared among all builds), so be careful to only publish timings from a single branch (e.g. master). Otherwise, timings won’t be accurate.



369
370
371
# File 'lib/rspecq/queue.rb', line 369

def key_timings
  "timings"
end

#key_worker_heartbeatsObject

redis: ZSET<worker_id => timestamp>

Timestamp of the last example processed by each worker.



360
361
362
# File 'lib/rspecq/queue.rb', line 360

def key_worker_heartbeats
  key("worker_heartbeats")
end

#non_example_errorsObject



227
228
229
# File 'lib/rspecq/queue.rb', line 227

def non_example_errors
  @redis.hgetall(key_errors)
end

#processed_jobsObject



206
207
208
# File 'lib/rspecq/queue.rb', line 206

def processed_jobs
  @redis.smembers(key_queue_processed)
end

#processed_jobs_countObject



202
203
204
# File 'lib/rspecq/queue.rb', line 202

def processed_jobs_count
  @redis.scard(key_queue_processed)
end

#publish(jobs, fail_fast = 0) ⇒ Object

NOTE: jobs will be processed from head to tail (lpop)



86
87
88
89
90
91
92
# File 'lib/rspecq/queue.rb', line 86

def publish(jobs, fail_fast = 0)
  @redis.multi do
    @redis.hset(key_queue_config, "fail_fast", fail_fast)
    @redis.rpush(key_queue_unprocessed, jobs)
    @redis.set(key_queue_status, STATUS_READY)
  end.first
end

#published?Boolean

Returns:

  • (Boolean)


241
242
243
# File 'lib/rspecq/queue.rb', line 241

def published?
  @redis.get(key_queue_status) == STATUS_READY
end

#record_build_time(duration) ⇒ Object



183
184
185
186
187
188
# File 'lib/rspecq/queue.rb', line 183

def record_build_time(duration)
  @redis.multi do
    @redis.lpush(key_build_times, Float(duration))
    @redis.ltrim(key_build_times, 0, 99)
  end
end

#record_example_failure(example_id, message) ⇒ Object



170
171
172
# File 'lib/rspecq/queue.rb', line 170

def record_example_failure(example_id, message)
  @redis.hset(key_failures, example_id, message)
end

#record_non_example_error(job, message) ⇒ Object

For errors occured outside of examples (e.g. while loading a spec file)



175
176
177
# File 'lib/rspecq/queue.rb', line 175

def record_non_example_error(job, message)
  @redis.hset(key_errors, job, message)
end

#record_timing(job, duration) ⇒ Object



179
180
181
# File 'lib/rspecq/queue.rb', line 179

def record_timing(job, duration)
  @redis.zadd(key_timings, duration, job)
end

#record_worker_heartbeatObject



190
191
192
# File 'lib/rspecq/queue.rb', line 190

def record_worker_heartbeat
  @redis.zadd(key_worker_heartbeats, current_time, @worker_id)
end

#requeue_job(example, max_requeues, original_worker_id) ⇒ Object

Put job at the head of the queue to be re-processed right after, by another worker. This is a mitigation measure against flaky tests.

Returns nil if the job hit the requeue limit and therefore was not requeued and should be considered a failure.



135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/rspecq/queue.rb', line 135

def requeue_job(example, max_requeues, original_worker_id)
  return false if max_requeues.zero?

  job = example.id
  location = example.location_rerun_argument

  @redis.eval(
    REQUEUE_JOB,
    keys: [key_queue_unprocessed, key_requeues, key("requeued_job_original_worker"), key("job_location")],
    argv: [job, max_requeues, original_worker_id, location]
  )
end

#requeue_lost_jobObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/rspecq/queue.rb', line 105

def requeue_lost_job
  @redis.eval(
    REQUEUE_LOST_JOB,
    keys: [
      key_worker_heartbeats,
      key_queue_running,
      key_queue_unprocessed
    ],
    argv: [
      current_time,
      WORKER_LIVENESS_SEC
    ]
  )
end

#requeued_jobsObject



210
211
212
# File 'lib/rspecq/queue.rb', line 210

def requeued_jobs
  @redis.hgetall(key_requeues)
end

#reserve_jobObject



94
95
96
97
98
99
100
101
102
103
# File 'lib/rspecq/queue.rb', line 94

def reserve_job
  @redis.eval(
    RESERVE_JOB,
    keys: [
      key_queue_unprocessed,
      key_queue_running,
    ],
    argv: [@worker_id]
  )
end

#save_worker_seed(worker, seed) ⇒ Object



148
149
150
# File 'lib/rspecq/queue.rb', line 148

def save_worker_seed(worker, seed)
  @redis.hset(key("worker_seed"), worker, seed)
end

#timingsObject

ordered by execution time desc (slowest are in the head)



219
220
221
# File 'lib/rspecq/queue.rb', line 219

def timings
  Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)]
end

#unprocessed_jobsObject

The remaining jobs to be processed. Jobs at the head of the list will be procesed first.



261
262
263
# File 'lib/rspecq/queue.rb', line 261

def unprocessed_jobs
  @redis.lrange(key_queue_unprocessed, 0, -1)
end

#wait_until_published(timeout = 30) ⇒ Object



245
246
247
248
249
250
251
252
253
# File 'lib/rspecq/queue.rb', line 245

def wait_until_published(timeout = 30)
  (timeout * 10).times do
    return if published?

    sleep 0.1
  end

  raise "Queue not yet published after #{timeout} seconds"
end