Class: RSpecQ::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/rspecq/worker.rb

Overview

A Worker, given a build ID, continuously consumes tests off the corresponding and executes them, until the queue is empty. It is also responsible for populating the initial queue.

Essentially, a worker is an RSpec runner that prints the results of the tests it executes to standard output.

The typical use case is to spawn many workers for a given build, thereby parallelizing the work and achieving faster build times.

Workers are readers+writers of the queue.

Constant Summary collapse

HEARTBEAT_FREQUENCY =
WORKER_LIVENESS_SEC / 6

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(build_id:, worker_id:, redis_opts:) ⇒ Worker

Returns a new instance of Worker.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/rspecq/worker.rb', line 66

def initialize(build_id:, worker_id:, redis_opts:)
  @build_id = build_id
  @worker_id = worker_id
  @queue = Queue.new(build_id, worker_id, redis_opts)
  @fail_fast = 0
  @files_or_dirs_to_run = "spec"
  @populate_timings = false
  @file_split_threshold = 999_999
  @heartbeat_updated_at = nil
  @max_requeues = 3
  @queue_wait_timeout = 30
  @seed = srand && srand % 0xFFFF
  @tags = []
  @reproduction = false

  RSpec::Core::Formatters.register(Formatters::JobTimingRecorder, :dump_summary)
  RSpec::Core::Formatters.register(Formatters::ExampleCountRecorder, :dump_summary)
  RSpec::Core::Formatters.register(Formatters::FailureRecorder, :example_failed, :message)
  RSpec::Core::Formatters.register(Formatters::WorkerHeartbeatRecorder, :example_finished)
end

Instance Attribute Details

#fail_fastObject

Stop the execution after N failed tests. Do not stop at any point when set to 0.

Defaults to 0



47
48
49
# File 'lib/rspecq/worker.rb', line 47

def fail_fast
  @fail_fast
end

#file_split_thresholdObject

If set, spec files that are known to take more than this value to finish, will be split and scheduled on a per-example basis.

Defaults to 999999



35
36
37
# File 'lib/rspecq/worker.rb', line 35

def file_split_threshold
  @file_split_threshold
end

#files_or_dirs_to_runObject

The root path or individual spec files to execute.

Defaults to “spec” (similar to RSpec)



24
25
26
# File 'lib/rspecq/worker.rb', line 24

def files_or_dirs_to_run
  @files_or_dirs_to_run
end

#max_requeuesObject

Retry failed examples up to N times (with N being the supplied value) before considering them legit failures

Defaults to 3



41
42
43
# File 'lib/rspecq/worker.rb', line 41

def max_requeues
  @max_requeues
end

#populate_timingsObject

If true, job timings will be populated in the global Redis timings key

Defaults to false



29
30
31
# File 'lib/rspecq/worker.rb', line 29

def populate_timings
  @populate_timings
end

#queueObject (readonly)

Returns the value of attribute queue.



64
65
66
# File 'lib/rspecq/worker.rb', line 64

def queue
  @queue
end

#queue_wait_timeoutObject

Time to wait for a queue to be published.

Defaults to 30



52
53
54
# File 'lib/rspecq/worker.rb', line 52

def queue_wait_timeout
  @queue_wait_timeout
end

#reproductionObject

Reproduction flag. If true, worker will publish files in the exact order given in the command.



62
63
64
# File 'lib/rspecq/worker.rb', line 62

def reproduction
  @reproduction
end

#seedObject

The RSpec seed



55
56
57
# File 'lib/rspecq/worker.rb', line 55

def seed
  @seed
end

#tagsObject

Rspec tags



58
59
60
# File 'lib/rspecq/worker.rb', line 58

def tags
  @tags
end

Instance Method Details

#try_publish_queue!(queue) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/rspecq/worker.rb', line 147

def try_publish_queue!(queue)
  return if !queue.become_master

  if reproduction
    q_size = queue.publish(files_or_dirs_to_run, fail_fast)
    log_event(
      "Reproduction mode. Published queue as given (size=#{q_size})",
      "info"
    )
    return
  end
  RSpec.configuration.files_or_directories_to_run = files_or_dirs_to_run
  files_to_run = RSpec.configuration.files_to_run.map { |j| relative_path(j) }

  timings = queue.timings
  if timings.empty?
    q_size = queue.publish(files_to_run.shuffle, fail_fast)
    log_event(
      "No timings found! Published queue in random order (size=#{q_size})",
      "warning"
    )
    return
  end

  # prepare jobs to run
  jobs = []
  slow_files = []

  if file_split_threshold
    slow_files = timings.take_while do |_job, duration|
      duration >= file_split_threshold
    end.map(&:first) & files_to_run
  end

  if slow_files.any?
    jobs.concat(files_to_run - slow_files)
    jobs.concat(files_to_example_ids(slow_files))
  else
    jobs.concat(files_to_run)
  end

  default_timing = timings.values[timings.values.size / 2]

  # assign timings (based on previous runs) to all jobs
  jobs = jobs.each_with_object({}) do |j, h|
    puts "Untimed job: #{j}" if timings[j].nil?

    # HEURISTIC: put jobs without previous timings (e.g. a newly added
    # spec file) in the middle of the queue
    h[j] = timings[j] || default_timing
  end

  # sort jobs based on their timings (slowest to be processed first)
  jobs = jobs.sort_by { |_j, t| -t }.map(&:first)

  puts "Published queue (size=#{queue.publish(jobs, fail_fast)})"
end

#update_heartbeatObject

Update the worker heartbeat if necessary



140
141
142
143
144
145
# File 'lib/rspecq/worker.rb', line 140

def update_heartbeat
  if @heartbeat_updated_at.nil? || elapsed(@heartbeat_updated_at) >= HEARTBEAT_FREQUENCY
    queue.record_worker_heartbeat
    @heartbeat_updated_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end
end

#workObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rspecq/worker.rb', line 87

def work
  puts "Working for build #{@build_id} (worker=#{@worker_id})"

  try_publish_queue!(queue)
  queue.wait_until_published(queue_wait_timeout)
  queue.save_worker_seed(@worker_id, seed)

  loop do
    # we have to bootstrap this so that it can be used in the first call
    # to `requeue_lost_job` inside the work loop
    update_heartbeat

    return if queue.build_failed_fast?

    lost = queue.requeue_lost_job
    puts "Requeued lost job: #{lost}" if lost

    # TODO: can we make `reserve_job` also act like exhausted? and get
    # rid of `exhausted?` (i.e. return false if no jobs remain)
    job = queue.reserve_job

    # build is finished
    return if job.nil? && queue.exhausted?

    next if job.nil?

    puts
    puts "Executing #{job}"

    reset_rspec_state!

    # reconfigure rspec
    RSpec.configuration.detail_color = :magenta
    RSpec.configuration.seed = seed
    RSpec.configuration.backtrace_formatter.filter_gem("rspecq")
    RSpec.configuration.add_formatter(Formatters::FailureRecorder.new(queue, job, max_requeues, @worker_id))
    RSpec.configuration.add_formatter(Formatters::ExampleCountRecorder.new(queue))
    RSpec.configuration.add_formatter(Formatters::WorkerHeartbeatRecorder.new(self))

    if populate_timings
      RSpec.configuration.add_formatter(Formatters::JobTimingRecorder.new(queue, job))
    end
    
    options = ["--format", "progress", job]
    tags.each { |tag| options.push(*["--tag", tag]) }
    opts = RSpec::Core::ConfigurationOptions.new(options)
    _result = RSpec::Core::Runner.new(opts).run($stderr, $stdout)

    queue.acknowledge_job(job)
  end
end