Class: CI::Queue::Redis::BuildRecord
- Inherits:
-
Object
- Object
- CI::Queue::Redis::BuildRecord
- Defined in:
- lib/ci/queue/redis/build_record.rb
Constant Summary collapse
- TOTAL_KEY =
"___total___"
Instance Method Summary collapse
- #error_reports ⇒ Object
- #failed_tests ⇒ Object
- #fetch_stats(stat_names) ⇒ Object
- #flaky_reports ⇒ Object
-
#initialize(queue, redis, config) ⇒ BuildRecord
constructor
A new instance of BuildRecord.
- #max_test_failed? ⇒ Boolean
- #pop_warnings ⇒ Object
- #progress ⇒ Object
- #queue_exhausted? ⇒ Boolean
- #record_error(id, payload, stat_delta: nil) ⇒ Object
- #record_flaky(id, stats: nil) ⇒ Object
- #record_requeue(id) ⇒ Object
- #record_stats(stats = nil, pipeline: nil) ⇒ Object
-
#record_stats_delta(delta, pipeline: nil) ⇒ Object
Apply a delta to this worker’s stats in Redis (HINCRBY).
- #record_success(id, skip_flaky_record: false) ⇒ Object
- #record_warning(type, attributes) ⇒ Object
- #record_worker_profile(profile) ⇒ Object
- #report_worker_error(error) ⇒ Object
- #requeued_tests ⇒ Object
- #reset_stats(stat_names) ⇒ Object
- #reset_worker_error ⇒ Object
- #worker_errors ⇒ Object
- #worker_profiles ⇒ Object
Constructor Details
#initialize(queue, redis, config) ⇒ BuildRecord
Returns a new instance of BuildRecord.
6 7 8 9 10 |
# File 'lib/ci/queue/redis/build_record.rb', line 6 def initialize(queue, redis, config) @queue = queue @redis = redis @config = config end |
Instance Method Details
#error_reports ⇒ Object
144 145 146 |
# File 'lib/ci/queue/redis/build_record.rb', line 144 def error_reports redis.hgetall(key('error-reports')) end |
#failed_tests ⇒ Object
35 36 37 |
# File 'lib/ci/queue/redis/build_record.rb', line 35 def failed_tests redis.hkeys(key('error-reports')) end |
#fetch_stats(stat_names) ⇒ Object
164 165 166 167 168 169 170 171 172 |
# File 'lib/ci/queue/redis/build_record.rb', line 164 def fetch_stats(stat_names) counts = redis.pipelined do |pipeline| stat_names.each { |c| pipeline.hvals(key(c)) } end sum_counts = counts.map do |values| values.map(&:to_f).inject(:+).to_f end stat_names.zip(sum_counts).to_h end |
#flaky_reports ⇒ Object
148 149 150 |
# File 'lib/ci/queue/redis/build_record.rb', line 148 def flaky_reports redis.smembers(key('flaky-reports')) end |
#max_test_failed? ⇒ Boolean
138 139 140 141 142 |
# File 'lib/ci/queue/redis/build_record.rb', line 138 def max_test_failed? return false if config.max_test_failed.nil? @queue.test_failures >= config.max_test_failed end |
#pop_warnings ⇒ Object
46 47 48 49 50 51 52 53 |
# File 'lib/ci/queue/redis/build_record.rb', line 46 def pop_warnings warnings = redis.multi do |transaction| transaction.lrange(key('warnings'), 0, -1) transaction.del(key('warnings')) end.first warnings.map { |p| Marshal.load(p) } end |
#progress ⇒ Object
12 13 14 |
# File 'lib/ci/queue/redis/build_record.rb', line 12 def progress @queue.progress end |
#queue_exhausted? ⇒ Boolean
16 17 18 |
# File 'lib/ci/queue/redis/build_record.rb', line 16 def queue_exhausted? @queue.exhausted? end |
#record_error(id, payload, stat_delta: nil) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/ci/queue/redis/build_record.rb', line 59 def record_error(id, payload, stat_delta: nil) # Run acknowledge first so we know whether we're the first to ack acknowledged = @queue.acknowledge(id, error: payload) if acknowledged # We were the first to ack; another worker already ack'd would get falsy from SADD @queue.increment_test_failed # Only the acknowledging worker's stats include this failure (others skip increment when ack=false). # Store so we can subtract it if another worker records success later. store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any? end # Return so caller can roll back local counter when not acknowledged !!acknowledged end |
#record_flaky(id, stats: nil) ⇒ Object
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/ci/queue/redis/build_record.rb', line 127 def record_flaky(id, stats: nil) redis.pipelined do |pipeline| pipeline.sadd?( key('flaky-reports'), id.b ) pipeline.expire(key('flaky-reports'), config.redis_ttl) end nil end |
#record_requeue(id) ⇒ Object
91 92 93 |
# File 'lib/ci/queue/redis/build_record.rb', line 91 def record_requeue(id) true end |
#record_stats(stats = nil, pipeline: nil) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/ci/queue/redis/build_record.rb', line 95 def record_stats(stats = nil, pipeline: nil) return unless stats if pipeline stats.each do |stat_name, stat_value| pipeline.hset(key(stat_name), config.worker_id, stat_value) pipeline.expire(key(stat_name), config.redis_ttl) end else redis.pipelined do |p| record_stats(stats, pipeline: p) end end end |
#record_stats_delta(delta, pipeline: nil) ⇒ Object
Apply a delta to this worker’s stats in Redis (HINCRBY). Use this instead of record_stats when recording per-test so we never overwrite and correction sticks.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/ci/queue/redis/build_record.rb', line 111 def record_stats_delta(delta, pipeline: nil) return if delta.nil? || delta.empty? apply_delta = lambda do |p| delta.each do |stat_name, value| next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/) p.hincrbyfloat(key(stat_name), config.worker_id.to_s, value.to_f) p.expire(key(stat_name), config.redis_ttl) end end if pipeline apply_delta.call(pipeline) else redis.pipelined { |p| apply_delta.call(p) } end end |
#record_success(id, skip_flaky_record: false) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/ci/queue/redis/build_record.rb', line 74 def record_success(id, skip_flaky_record: false) acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction| @queue.acknowledge(id, pipeline: transaction) transaction.hdel(key('error-reports'), id) transaction.hget(key('requeues-count'), id) transaction.hget(key('error-report-deltas'), id) end # When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution if error_reports_deleted_count.to_i > 0 && delta_json apply_error_report_delta_correction(delta_json) redis.hdel(key('error-report-deltas'), id) end record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0) # Count this run when we ack'd or when we replaced a failure (so stats delta is applied) !!(acknowledged || error_reports_deleted_count.to_i > 0) end |
#record_warning(type, attributes) ⇒ Object
55 56 57 |
# File 'lib/ci/queue/redis/build_record.rb', line 55 def record_warning(type, attributes) redis.rpush(key('warnings'), Marshal.dump([type, attributes])) end |
#record_worker_profile(profile) ⇒ Object
152 153 154 155 156 157 |
# File 'lib/ci/queue/redis/build_record.rb', line 152 def record_worker_profile(profile) redis.pipelined do |pipeline| pipeline.hset(key('worker-profiles'), config.worker_id, JSON.dump(profile)) pipeline.expire(key('worker-profiles'), config.redis_ttl) end end |
#report_worker_error(error) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/ci/queue/redis/build_record.rb', line 20 def report_worker_error(error) redis.pipelined do |pipeline| pipeline.hset(key('worker-errors'), config.worker_id, error.) pipeline.expire(key('worker-errors'), config.redis_ttl) end end |
#requeued_tests ⇒ Object
40 41 42 43 44 |
# File 'lib/ci/queue/redis/build_record.rb', line 40 def requeued_tests requeues = redis.hgetall(key('requeues-count')) requeues.delete(TOTAL_KEY) requeues end |
#reset_stats(stat_names) ⇒ Object
174 175 176 177 178 179 180 |
# File 'lib/ci/queue/redis/build_record.rb', line 174 def reset_stats(stat_names) redis.pipelined do |pipeline| stat_names.each do |stat_name| pipeline.hdel(key(stat_name), config.worker_id) end end end |
#reset_worker_error ⇒ Object
31 32 33 |
# File 'lib/ci/queue/redis/build_record.rb', line 31 def reset_worker_error redis.hdel(key('worker-errors'), config.worker_id) end |
#worker_errors ⇒ Object
27 28 29 |
# File 'lib/ci/queue/redis/build_record.rb', line 27 def worker_errors redis.hgetall(key('worker-errors')) end |
#worker_profiles ⇒ Object
159 160 161 162 |
# File 'lib/ci/queue/redis/build_record.rb', line 159 def worker_profiles raw = redis.hgetall(key('worker-profiles')) raw.transform_values { |v| JSON.parse(v) } end |