Class: CI::Queue::Redis::BuildRecord

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/build_record.rb

Constant Summary collapse

TOTAL_KEY =
"___total___"

Instance Method Summary collapse

Constructor Details

#initialize(queue, redis, config) ⇒ BuildRecord

Returns a new instance of BuildRecord.



6
7
8
9
10
# File 'lib/ci/queue/redis/build_record.rb', line 6

def initialize(queue, redis, config)
  @queue = queue
  @redis = redis
  @config = config
end

Instance Method Details

#error_reportsObject



144
145
146
# File 'lib/ci/queue/redis/build_record.rb', line 144

def error_reports
  redis.hgetall(key('error-reports'))
end

#failed_testsObject



35
36
37
# File 'lib/ci/queue/redis/build_record.rb', line 35

def failed_tests
  redis.hkeys(key('error-reports'))
end

#fetch_stats(stat_names) ⇒ Object



164
165
166
167
168
169
170
171
172
# File 'lib/ci/queue/redis/build_record.rb', line 164

def fetch_stats(stat_names)
  counts = redis.pipelined do |pipeline|
    stat_names.each { |c| pipeline.hvals(key(c)) }
  end
  sum_counts = counts.map do |values|
    values.map(&:to_f).inject(:+).to_f
  end
  stat_names.zip(sum_counts).to_h
end

#flaky_reportsObject



148
149
150
# File 'lib/ci/queue/redis/build_record.rb', line 148

def flaky_reports
  redis.smembers(key('flaky-reports'))
end

#max_test_failed?Boolean

Returns:

  • (Boolean)


138
139
140
141
142
# File 'lib/ci/queue/redis/build_record.rb', line 138

def max_test_failed?
  return false if config.max_test_failed.nil?

  @queue.test_failures >= config.max_test_failed
end

#pop_warningsObject



46
47
48
49
50
51
52
53
# File 'lib/ci/queue/redis/build_record.rb', line 46

def pop_warnings
  warnings = redis.multi do |transaction|
    transaction.lrange(key('warnings'), 0, -1)
    transaction.del(key('warnings'))
  end.first

  warnings.map { |p| Marshal.load(p) }
end

#progressObject



12
13
14
# File 'lib/ci/queue/redis/build_record.rb', line 12

def progress
  @queue.progress
end

#queue_exhausted?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/ci/queue/redis/build_record.rb', line 16

def queue_exhausted?
  @queue.exhausted?
end

#record_error(id, payload, stat_delta: nil) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/ci/queue/redis/build_record.rb', line 59

def record_error(id, payload, stat_delta: nil)
  # Run acknowledge first so we know whether we're the first to ack
  acknowledged = @queue.acknowledge(id, error: payload)

  if acknowledged
    # We were the first to ack; another worker already ack'd would get falsy from SADD
    @queue.increment_test_failed
    # Only the acknowledging worker's stats include this failure (others skip increment when ack=false).
    # Store so we can subtract it if another worker records success later.
    store_error_report_delta(id, stat_delta) if stat_delta && stat_delta.any?
  end
  # Return so caller can roll back local counter when not acknowledged
  !!acknowledged
end

#record_flaky(id, stats: nil) ⇒ Object



127
128
129
130
131
132
133
134
135
136
# File 'lib/ci/queue/redis/build_record.rb', line 127

def record_flaky(id, stats: nil)
  redis.pipelined do |pipeline|
    pipeline.sadd?(
      key('flaky-reports'),
      id.b
    )
    pipeline.expire(key('flaky-reports'), config.redis_ttl)
  end
  nil
end

#record_requeue(id) ⇒ Object



91
92
93
# File 'lib/ci/queue/redis/build_record.rb', line 91

def record_requeue(id)
  true
end

#record_stats(stats = nil, pipeline: nil) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/ci/queue/redis/build_record.rb', line 95

def record_stats(stats = nil, pipeline: nil)
  return unless stats
  if pipeline
    stats.each do |stat_name, stat_value|
      pipeline.hset(key(stat_name), config.worker_id, stat_value)
      pipeline.expire(key(stat_name), config.redis_ttl)
    end
  else
    redis.pipelined do |p|
      record_stats(stats, pipeline: p)
    end
  end
end

#record_stats_delta(delta, pipeline: nil) ⇒ Object

Apply a delta to this worker’s stats in Redis (HINCRBY). Use this instead of record_stats when recording per-test so we never overwrite and correction sticks.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/ci/queue/redis/build_record.rb', line 111

def record_stats_delta(delta, pipeline: nil)
  return if delta.nil? || delta.empty?
  apply_delta = lambda do |p|
    delta.each do |stat_name, value|
      next unless value.is_a?(Numeric) || value.to_s.match?(/\A-?\d+\.?\d*\z/)
      p.hincrbyfloat(key(stat_name), config.worker_id.to_s, value.to_f)
      p.expire(key(stat_name), config.redis_ttl)
    end
  end
  if pipeline
    apply_delta.call(pipeline)
  else
    redis.pipelined { |p| apply_delta.call(p) }
  end
end

#record_success(id, skip_flaky_record: false) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/ci/queue/redis/build_record.rb', line 74

def record_success(id, skip_flaky_record: false)
  acknowledged, error_reports_deleted_count, requeued_count, delta_json = redis.multi do |transaction|
    @queue.acknowledge(id, pipeline: transaction)
    transaction.hdel(key('error-reports'), id)
    transaction.hget(key('requeues-count'), id)
    transaction.hget(key('error-report-deltas'), id)
  end
  # When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution
  if error_reports_deleted_count.to_i > 0 && delta_json
    apply_error_report_delta_correction(delta_json)
    redis.hdel(key('error-report-deltas'), id)
  end
  record_flaky(id) if !skip_flaky_record && (error_reports_deleted_count.to_i > 0 || requeued_count.to_i > 0)
  # Count this run when we ack'd or when we replaced a failure (so stats delta is applied)
  !!(acknowledged || error_reports_deleted_count.to_i > 0)
end

#record_warning(type, attributes) ⇒ Object



55
56
57
# File 'lib/ci/queue/redis/build_record.rb', line 55

def record_warning(type, attributes)
  redis.rpush(key('warnings'), Marshal.dump([type, attributes]))
end

#record_worker_profile(profile) ⇒ Object



152
153
154
155
156
157
# File 'lib/ci/queue/redis/build_record.rb', line 152

def record_worker_profile(profile)
  redis.pipelined do |pipeline|
    pipeline.hset(key('worker-profiles'), config.worker_id, JSON.dump(profile))
    pipeline.expire(key('worker-profiles'), config.redis_ttl)
  end
end

#report_worker_error(error) ⇒ Object



20
21
22
23
24
25
# File 'lib/ci/queue/redis/build_record.rb', line 20

def report_worker_error(error)
  redis.pipelined do |pipeline|
    pipeline.hset(key('worker-errors'), config.worker_id, error.message)
    pipeline.expire(key('worker-errors'), config.redis_ttl)
  end
end

#requeued_testsObject



40
41
42
43
44
# File 'lib/ci/queue/redis/build_record.rb', line 40

def requeued_tests
  requeues = redis.hgetall(key('requeues-count'))
  requeues.delete(TOTAL_KEY)
  requeues
end

#reset_stats(stat_names) ⇒ Object



174
175
176
177
178
179
180
# File 'lib/ci/queue/redis/build_record.rb', line 174

def reset_stats(stat_names)
  redis.pipelined do |pipeline|
    stat_names.each do |stat_name|
      pipeline.hdel(key(stat_name), config.worker_id)
    end
  end
end

#reset_worker_errorObject



31
32
33
# File 'lib/ci/queue/redis/build_record.rb', line 31

def reset_worker_error
  redis.hdel(key('worker-errors'), config.worker_id)
end

#worker_errorsObject



27
28
29
# File 'lib/ci/queue/redis/build_record.rb', line 27

def worker_errors
  redis.hgetall(key('worker-errors'))
end

#worker_profilesObject



159
160
161
162
# File 'lib/ci/queue/redis/build_record.rb', line 159

def worker_profiles
  raw = redis.hgetall(key('worker-profiles'))
  raw.transform_values { |v| JSON.parse(v) }
end