Class: RSpecQ::Worker
- Inherits:
-
Object
- Object
- RSpecQ::Worker
- Defined in:
- lib/rspecq/worker.rb
Overview
A Worker, given a build ID, continuously consumes tests off the corresponding and executes them, until the queue is empty. It is also responsible for populating the initial queue.
Essentially, a worker is an RSpec runner that prints the results of the tests it executes to standard output.
The typical use case is to spawn many workers for a given build, thereby parallelizing the work and achieving faster build times.
Workers are readers+writers of the queue.
Constant Summary collapse
- HEARTBEAT_FREQUENCY =
WORKER_LIVENESS_SEC / 6
Instance Attribute Summary collapse
-
#fail_fast ⇒ Object
Stop the execution after N failed tests.
-
#file_split_threshold ⇒ Object
If set, spec files that are known to take more than this value to finish, will be split and scheduled on a per-example basis.
-
#files_or_dirs_to_run ⇒ Object
The root path or individual spec files to execute.
-
#max_requeues ⇒ Object
Retry failed examples up to N times (with N being the supplied value) before considering them legit failures.
-
#populate_timings ⇒ Object
If true, job timings will be populated in the global Redis timings key.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queue_wait_timeout ⇒ Object
Time to wait for a queue to be published.
-
#reproduction ⇒ Object
Reproduction flag.
-
#seed ⇒ Object
The RSpec seed.
-
#tags ⇒ Object
Rspec tags.
Instance Method Summary collapse
-
#initialize(build_id:, worker_id:, redis_opts:) ⇒ Worker
constructor
A new instance of Worker.
- #try_publish_queue!(queue) ⇒ Object
-
#update_heartbeat ⇒ Object
Update the worker heartbeat if necessary.
- #work ⇒ Object
Constructor Details
#initialize(build_id:, worker_id:, redis_opts:) ⇒ Worker
Returns a new instance of Worker.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/rspecq/worker.rb', line 66 def initialize(build_id:, worker_id:, redis_opts:) @build_id = build_id @worker_id = worker_id @queue = Queue.new(build_id, worker_id, redis_opts) @fail_fast = 0 @files_or_dirs_to_run = "spec" @populate_timings = false @file_split_threshold = 999_999 @heartbeat_updated_at = nil @max_requeues = 3 @queue_wait_timeout = 30 @seed = srand && srand % 0xFFFF = [] @reproduction = false RSpec::Core::Formatters.register(Formatters::JobTimingRecorder, :dump_summary) RSpec::Core::Formatters.register(Formatters::ExampleCountRecorder, :dump_summary) RSpec::Core::Formatters.register(Formatters::FailureRecorder, :example_failed, :message) RSpec::Core::Formatters.register(Formatters::WorkerHeartbeatRecorder, :example_finished) end |
Instance Attribute Details
#fail_fast ⇒ Object
Stop the execution after N failed tests. Do not stop at any point when set to 0.
Defaults to 0
47 48 49 |
# File 'lib/rspecq/worker.rb', line 47 def fail_fast @fail_fast end |
#file_split_threshold ⇒ Object
If set, spec files that are known to take more than this value to finish, will be split and scheduled on a per-example basis.
Defaults to 999999
35 36 37 |
# File 'lib/rspecq/worker.rb', line 35 def file_split_threshold @file_split_threshold end |
#files_or_dirs_to_run ⇒ Object
The root path or individual spec files to execute.
Defaults to “spec” (similar to RSpec)
24 25 26 |
# File 'lib/rspecq/worker.rb', line 24 def files_or_dirs_to_run @files_or_dirs_to_run end |
#max_requeues ⇒ Object
Retry failed examples up to N times (with N being the supplied value) before considering them legit failures
Defaults to 3
41 42 43 |
# File 'lib/rspecq/worker.rb', line 41 def max_requeues @max_requeues end |
#populate_timings ⇒ Object
If true, job timings will be populated in the global Redis timings key
Defaults to false
29 30 31 |
# File 'lib/rspecq/worker.rb', line 29 def populate_timings @populate_timings end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
64 65 66 |
# File 'lib/rspecq/worker.rb', line 64 def queue @queue end |
#queue_wait_timeout ⇒ Object
Time to wait for a queue to be published.
Defaults to 30
52 53 54 |
# File 'lib/rspecq/worker.rb', line 52 def queue_wait_timeout @queue_wait_timeout end |
#reproduction ⇒ Object
Reproduction flag. If true, worker will publish files in the exact order given in the command.
62 63 64 |
# File 'lib/rspecq/worker.rb', line 62 def reproduction @reproduction end |
#seed ⇒ Object
The RSpec seed
55 56 57 |
# File 'lib/rspecq/worker.rb', line 55 def seed @seed end |
#tags ⇒ Object
Rspec tags
58 59 60 |
# File 'lib/rspecq/worker.rb', line 58 def end |
Instance Method Details
#try_publish_queue!(queue) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/rspecq/worker.rb', line 147 def try_publish_queue!(queue) return if !queue.become_master if reproduction q_size = queue.publish(files_or_dirs_to_run, fail_fast) log_event( "Reproduction mode. Published queue as given (size=#{q_size})", "info" ) return end RSpec.configuration.files_or_directories_to_run = files_or_dirs_to_run files_to_run = RSpec.configuration.files_to_run.map { |j| relative_path(j) } timings = queue.timings if timings.empty? q_size = queue.publish(files_to_run.shuffle, fail_fast) log_event( "No timings found! Published queue in random order (size=#{q_size})", "warning" ) return end # prepare jobs to run jobs = [] slow_files = [] if file_split_threshold slow_files = timings.take_while do |_job, duration| duration >= file_split_threshold end.map(&:first) & files_to_run end if slow_files.any? jobs.concat(files_to_run - slow_files) jobs.concat(files_to_example_ids(slow_files)) else jobs.concat(files_to_run) end default_timing = timings.values[timings.values.size / 2] # assign timings (based on previous runs) to all jobs jobs = jobs.each_with_object({}) do |j, h| puts "Untimed job: #{j}" if timings[j].nil? # HEURISTIC: put jobs without previous timings (e.g. a newly added # spec file) in the middle of the queue h[j] = timings[j] || default_timing end # sort jobs based on their timings (slowest to be processed first) jobs = jobs.sort_by { |_j, t| -t }.map(&:first) puts "Published queue (size=#{queue.publish(jobs, fail_fast)})" end |
#update_heartbeat ⇒ Object
Update the worker heartbeat if necessary
140 141 142 143 144 145 |
# File 'lib/rspecq/worker.rb', line 140 def update_heartbeat if @heartbeat_updated_at.nil? || elapsed(@heartbeat_updated_at) >= HEARTBEAT_FREQUENCY queue.record_worker_heartbeat @heartbeat_updated_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) end end |
#work ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/rspecq/worker.rb', line 87 def work puts "Working for build #{@build_id} (worker=#{@worker_id})" try_publish_queue!(queue) queue.wait_until_published(queue_wait_timeout) queue.save_worker_seed(@worker_id, seed) loop do # we have to bootstrap this so that it can be used in the first call # to `requeue_lost_job` inside the work loop update_heartbeat return if queue.build_failed_fast? lost = queue.requeue_lost_job puts "Requeued lost job: #{lost}" if lost # TODO: can we make `reserve_job` also act like exhausted? and get # rid of `exhausted?` (i.e. return false if no jobs remain) job = queue.reserve_job # build is finished return if job.nil? && queue.exhausted? next if job.nil? puts puts "Executing #{job}" reset_rspec_state! # reconfigure rspec RSpec.configuration.detail_color = :magenta RSpec.configuration.seed = seed RSpec.configuration.backtrace_formatter.filter_gem("rspecq") RSpec.configuration.add_formatter(Formatters::FailureRecorder.new(queue, job, max_requeues, @worker_id)) RSpec.configuration.add_formatter(Formatters::ExampleCountRecorder.new(queue)) RSpec.configuration.add_formatter(Formatters::WorkerHeartbeatRecorder.new(self)) if populate_timings RSpec.configuration.add_formatter(Formatters::JobTimingRecorder.new(queue, job)) end = ["--format", "progress", job] .each { |tag| .push(*["--tag", tag]) } opts = RSpec::Core::ConfigurationOptions.new() _result = RSpec::Core::Runner.new(opts).run($stderr, $stdout) queue.acknowledge_job(job) end end |