Class: RSpecQ::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rspecq/queue.rb

Overview

Queue is the data store interface (Redis) and is used to manage the work queue for a particular build. All Redis operations happen via Queue.

A queue typically contains all the data needed for a particular build to happen. These include (but are not limited to) the following:

  • the list of jobs (spec files and/or examples) to be executed

  • the failed examples along with their backtrace

  • the set of running jobs

  • previous job timing statistics used to optimally schedule the jobs

  • the set of executed jobs

Constant Summary collapse

RESERVE_JOB =
"local queue = KEYS[1]\nlocal queue_running = KEYS[2]\nlocal worker_id = ARGV[1]\n\nlocal job = redis.call('lpop', queue)\nif job then\n  redis.call('hset', queue_running, worker_id, job)\n  return job\nelse\n  return nil\nend\n".freeze
REQUEUE_LOST_JOB =

Scans for dead workers and puts their reserved jobs back to the queue.

"local worker_heartbeats = KEYS[1]\nlocal queue_running = KEYS[2]\nlocal queue_unprocessed = KEYS[3]\nlocal time_now = ARGV[1]\nlocal timeout = ARGV[2]\n\nlocal dead_workers = redis.call('zrangebyscore', worker_heartbeats, 0, time_now - timeout)\nfor _, worker in ipairs(dead_workers) do\n  local job = redis.call('hget', queue_running, worker)\n  if job then\n    redis.call('lpush', queue_unprocessed, job)\n    redis.call('hdel', queue_running, worker)\n    return job\n  end\nend\n\nreturn nil\n".freeze
REQUEUE_JOB =
"local key_queue_unprocessed = KEYS[1]\nlocal key_requeues = KEYS[2]\nlocal job = ARGV[1]\nlocal max_requeues = ARGV[2]\n\nlocal requeued_times = redis.call('hget', key_requeues, job)\nif requeued_times and requeued_times >= max_requeues then\n  return nil\nend\n\nredis.call('lpush', key_queue_unprocessed, job)\nredis.call('hincrby', key_requeues, job, 1)\n\nreturn true\n".freeze
STATUS_INITIALIZING =
"initializing".freeze
STATUS_READY =
"ready".freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(build_id, worker_id, redis_opts) ⇒ Queue

Returns a new instance of Queue.



73
74
75
76
77
# File 'lib/rspecq/queue.rb', line 73

def initialize(build_id, worker_id, redis_opts)
  @build_id = build_id
  @worker_id = worker_id
  @redis = Redis.new(redis_opts.merge(id: worker_id))
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



71
72
73
# File 'lib/rspecq/queue.rb', line 71

def redis
  @redis
end

Instance Method Details

#acknowledge_job(job) ⇒ Object

NOTE: The same job might happen to be acknowledged more than once, in the case of requeues.



116
117
118
119
120
121
# File 'lib/rspecq/queue.rb', line 116

def acknowledge_job(job)
  @redis.multi do
    @redis.hdel(key_queue_running, @worker_id)
    @redis.sadd(key_queue_processed, job)
  end
end

#become_masterObject



190
191
192
# File 'lib/rspecq/queue.rb', line 190

def become_master
  @redis.setnx(key_queue_status, STATUS_INITIALIZING)
end

#build_failed_fast?Boolean

Returns true if the number of failed tests, has surpassed the threshold to render the run unsuccessful and the build should be terminated.

Returns:

  • (Boolean)


267
268
269
270
271
272
273
274
275
276
# File 'lib/rspecq/queue.rb', line 267

def build_failed_fast?
  if fail_fast.nil? || fail_fast.zero?
    return false
  end

  @redis.multi do
    @redis.hlen(key_failures)
    @redis.hlen(key_errors)
  end.inject(:+) >= fail_fast
end

#build_successful?Boolean

Returns:

  • (Boolean)


231
232
233
# File 'lib/rspecq/queue.rb', line 231

def build_successful?
  exhausted? && example_failures.empty? && non_example_errors.empty?
end

#example_countObject



174
175
176
# File 'lib/rspecq/queue.rb', line 174

def example_count
  @redis.get(key_example_count).to_i
end

#example_failuresObject



199
200
201
# File 'lib/rspecq/queue.rb', line 199

def example_failures
  @redis.hgetall(key_failures)
end

#exhausted?Boolean

True if the build is complete, false otherwise

Returns:

  • (Boolean)


208
209
210
211
212
213
214
215
# File 'lib/rspecq/queue.rb', line 208

def exhausted?
  return false if !published?

  @redis.multi do
    @redis.llen(key_queue_unprocessed)
    @redis.hlen(key_queue_running)
  end.inject(:+).zero?
end

#fail_fastObject

Returns the number of failures that will trigger the build to fail-fast. Returns 0 if this feature is disabled and nil if the Queue is not yet published



259
260
261
262
263
# File 'lib/rspecq/queue.rb', line 259

def fail_fast
  return nil unless published?

  @fail_fast ||= Integer(@redis.hget(key_queue_config, "fail_fast"))
end

#flaky_jobsObject

Returns the jobs considered flaky (i.e. initially failed but passed after being retried). Must be called after the build is complete, otherwise an exception will be raised.



244
245
246
247
248
249
250
251
252
253
254
# File 'lib/rspecq/queue.rb', line 244

def flaky_jobs
  if !exhausted? && !build_failed_fast?
    raise "Queue is not yet exhausted"
  end

  requeued = @redis.hkeys(key_requeues)

  return [] if requeued.empty?

  requeued - @redis.hkeys(key_failures)
end

#increment_example_count(n) ⇒ Object



170
171
172
# File 'lib/rspecq/queue.rb', line 170

def increment_example_count(n)
  @redis.incrby(key_example_count, n)
end

#key_build_timesObject

redis: LIST<duration>

Last build is at the head of the list.



352
353
354
# File 'lib/rspecq/queue.rb', line 352

def key_build_times
  "build_times"
end

#key_errorsObject

Contains errors raised outside of RSpec examples (e.g. a syntax error in spec_helper.rb).

redis: HASH<job => error message>



314
315
316
# File 'lib/rspecq/queue.rb', line 314

def key_errors
  key("errors")
end

#key_example_countObject

The total number of examples, those that were requeued.

redis: STRING<integer>



329
330
331
# File 'lib/rspecq/queue.rb', line 329

def key_example_count
  key("example_count")
end

#key_failuresObject

Contains regular RSpec example failures.

redis: HASH<example_id => error message>



306
307
308
# File 'lib/rspecq/queue.rb', line 306

def key_failures
  key("example_failures")
end

#key_queue_configObject

redis: HASH<config_key => config_value>



284
285
286
# File 'lib/rspecq/queue.rb', line 284

def key_queue_config
  key("queue", "config")
end

#key_queue_processedObject

redis: SET<job>



299
300
301
# File 'lib/rspecq/queue.rb', line 299

def key_queue_processed
  key("queue", "processed")
end

#key_queue_runningObject

redis: HASH<worker_id => job>



294
295
296
# File 'lib/rspecq/queue.rb', line 294

def key_queue_running
  key("queue", "running")
end

#key_queue_statusObject

redis: STRING [STATUS_INITIALIZING, STATUS_READY]



279
280
281
# File 'lib/rspecq/queue.rb', line 279

def key_queue_status
  key("queue", "status")
end

#key_queue_unprocessedObject

redis: LIST<job>



289
290
291
# File 'lib/rspecq/queue.rb', line 289

def key_queue_unprocessed
  key("queue", "unprocessed")
end

#key_requeuesObject

As a mitigation mechanism for flaky tests, we requeue example failures to be retried by another worker, up to a certain number of times.

redis: HASH<job => times_retried>



322
323
324
# File 'lib/rspecq/queue.rb', line 322

def key_requeues
  key("requeues")
end

#key_timingsObject

redis: ZSET<job => duration>

NOTE: This key is not scoped to a build (i.e. shared among all builds), so be careful to only publish timings from a single branch (e.g. master). Otherwise, timings won’t be accurate.



345
346
347
# File 'lib/rspecq/queue.rb', line 345

def key_timings
  "timings"
end

#key_worker_heartbeatsObject

redis: ZSET<worker_id => timestamp>

Timestamp of the last example processed by each worker.



336
337
338
# File 'lib/rspecq/queue.rb', line 336

def key_worker_heartbeats
  key("worker_heartbeats")
end

#non_example_errorsObject



203
204
205
# File 'lib/rspecq/queue.rb', line 203

def non_example_errors
  @redis.hgetall(key_errors)
end

#processed_jobsObject



182
183
184
# File 'lib/rspecq/queue.rb', line 182

def processed_jobs
  @redis.smembers(key_queue_processed)
end

#processed_jobs_countObject



178
179
180
# File 'lib/rspecq/queue.rb', line 178

def processed_jobs_count
  @redis.scard(key_queue_processed)
end

#publish(jobs, fail_fast = 0) ⇒ Object

NOTE: jobs will be processed from head to tail (lpop)



80
81
82
83
84
85
86
# File 'lib/rspecq/queue.rb', line 80

def publish(jobs, fail_fast = 0)
  @redis.multi do
    @redis.hset(key_queue_config, "fail_fast", fail_fast)
    @redis.rpush(key_queue_unprocessed, jobs)
    @redis.set(key_queue_status, STATUS_READY)
  end.first
end

#published?Boolean

Returns:

  • (Boolean)


217
218
219
# File 'lib/rspecq/queue.rb', line 217

def published?
  @redis.get(key_queue_status) == STATUS_READY
end

#record_build_time(duration) ⇒ Object



159
160
161
162
163
164
# File 'lib/rspecq/queue.rb', line 159

def record_build_time(duration)
  @redis.multi do
    @redis.lpush(key_build_times, Float(duration))
    @redis.ltrim(key_build_times, 0, 99)
  end
end

#record_example_failure(example_id, message) ⇒ Object



146
147
148
# File 'lib/rspecq/queue.rb', line 146

def record_example_failure(example_id, message)
  @redis.hset(key_failures, example_id, message)
end

#record_non_example_error(job, message) ⇒ Object

For errors occured outside of examples (e.g. while loading a spec file)



151
152
153
# File 'lib/rspecq/queue.rb', line 151

def record_non_example_error(job, message)
  @redis.hset(key_errors, job, message)
end

#record_timing(job, duration) ⇒ Object



155
156
157
# File 'lib/rspecq/queue.rb', line 155

def record_timing(job, duration)
  @redis.zadd(key_timings, duration, job)
end

#record_worker_heartbeatObject



166
167
168
# File 'lib/rspecq/queue.rb', line 166

def record_worker_heartbeat
  @redis.zadd(key_worker_heartbeats, current_time, @worker_id)
end

#requeue_job(job, max_requeues) ⇒ Object

Put job at the head of the queue to be re-processed right after, by another worker. This is a mitigation measure against flaky tests.

Returns nil if the job hit the requeue limit and therefore was not requeued and should be considered a failure.



128
129
130
131
132
133
134
135
136
# File 'lib/rspecq/queue.rb', line 128

def requeue_job(job, max_requeues)
  return false if max_requeues.zero?

  @redis.eval(
    REQUEUE_JOB,
    keys: [key_queue_unprocessed, key_requeues],
    argv: [job, max_requeues]
  )
end

#requeue_lost_jobObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/rspecq/queue.rb', line 99

def requeue_lost_job
  @redis.eval(
    REQUEUE_LOST_JOB,
    keys: [
      key_worker_heartbeats,
      key_queue_running,
      key_queue_unprocessed
    ],
    argv: [
      current_time,
      WORKER_LIVENESS_SEC
    ]
  )
end

#requeued_jobsObject



186
187
188
# File 'lib/rspecq/queue.rb', line 186

def requeued_jobs
  @redis.hgetall(key_requeues)
end

#rerun_command(job) ⇒ Object



142
143
144
# File 'lib/rspecq/queue.rb', line 142

def rerun_command(job)
  @redis.hget(key("job_metadata"), job)
end

#reserve_jobObject



88
89
90
91
92
93
94
95
96
97
# File 'lib/rspecq/queue.rb', line 88

def reserve_job
  @redis.eval(
    RESERVE_JOB,
    keys: [
      key_queue_unprocessed,
      key_queue_running,
    ],
    argv: [@worker_id]
  )
end

#save_rerun_command(job, cmd) ⇒ Object



138
139
140
# File 'lib/rspecq/queue.rb', line 138

def save_rerun_command(job, cmd)
  @redis.hset(key("job_metadata"), job, cmd)
end

#timingsObject

ordered by execution time desc (slowest are in the head)



195
196
197
# File 'lib/rspecq/queue.rb', line 195

def timings
  Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)]
end

#unprocessed_jobsObject

The remaining jobs to be processed. Jobs at the head of the list will be procesed first.



237
238
239
# File 'lib/rspecq/queue.rb', line 237

def unprocessed_jobs
  @redis.lrange(key_queue_unprocessed, 0, -1)
end

#wait_until_published(timeout = 30) ⇒ Object



221
222
223
224
225
226
227
228
229
# File 'lib/rspecq/queue.rb', line 221

def wait_until_published(timeout = 30)
  (timeout * 10).times do
    return if published?

    sleep 0.1
  end

  raise "Queue not yet published after #{timeout} seconds"
end