Class: XCKnife::StreamParser
- Inherits:
-
Object
- Object
- XCKnife::StreamParser
- Includes:
- JsonStreamParserHelper
- Defined in:
- lib/xcknife/stream_parser.rb
Defined Under Namespace
Classes: MachineAssignment, Options, PartitionResult, PartitionWithMachines, ResultStats
Instance Attribute Summary collapse
-
#number_of_shards ⇒ Object
readonly
Returns the value of attribute number_of_shards.
-
#relevant_partitions ⇒ Object
readonly
Returns the value of attribute relevant_partitions.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
-
#test_partitions ⇒ Object
readonly
Returns the value of attribute test_partitions.
Instance Method Summary collapse
- #compute_shards_for_events(historical_events, current_events = nil) ⇒ Object
-
#compute_shards_for_file(historical_filename, current_test_filename = nil) ⇒ Object
Parses the output of a xctool json-stream reporter and compute the shards based of that see: github.com/facebook/xctool#included-reporters.
- #compute_shards_for_partitions(test_time_for_partitions) ⇒ Object
-
#compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) ⇒ Object
Computes a 2-aproximation to the optimal partition_time, which is an instance of the Open shop scheduling problem (which is NP-hard) see: en.wikipedia.org/wiki/Open-shop_scheduling.
-
#initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false) ⇒ StreamParser
constructor
A new instance of StreamParser.
-
#parse_json_stream_file(filename) ⇒ Object
rubocop:enable Metrics/CyclomaticComplexity.
-
#split_machines_proportionally(partitions) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity.
- #test_time_for_partitions(historical_events, current_events = nil) ⇒ Object
Methods included from JsonStreamParserHelper
#each_test_event, #normalize_result
Constructor Details
#initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false) ⇒ StreamParser
Returns a new instance of StreamParser.
15 16 17 18 19 20 21 22 23 |
# File 'lib/xcknife/stream_parser.rb', line 15 def initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false) @number_of_shards = number_of_shards @test_partitions = test_partitions.map(&:to_set) @relevant_partitions = test_partitions.flatten.to_set @stats = ResultStats.new @options_for_metapartition = .map { |o| Options::DEFAULT.merge(o) } @allow_fewer_shards = allow_fewer_shards ResultStats.members.each { |k| @stats[k] = 0 } end |
Instance Attribute Details
#number_of_shards ⇒ Object (readonly)
Returns the value of attribute number_of_shards.
13 14 15 |
# File 'lib/xcknife/stream_parser.rb', line 13 def number_of_shards @number_of_shards end |
#relevant_partitions ⇒ Object (readonly)
Returns the value of attribute relevant_partitions.
13 14 15 |
# File 'lib/xcknife/stream_parser.rb', line 13 def relevant_partitions @relevant_partitions end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
13 14 15 |
# File 'lib/xcknife/stream_parser.rb', line 13 def stats @stats end |
#test_partitions ⇒ Object (readonly)
Returns the value of attribute test_partitions.
13 14 15 |
# File 'lib/xcknife/stream_parser.rb', line 13 def test_partitions @test_partitions end |
Instance Method Details
#compute_shards_for_events(historical_events, current_events = nil) ⇒ Object
84 85 86 |
# File 'lib/xcknife/stream_parser.rb', line 84 def compute_shards_for_events(historical_events, current_events = nil) compute_shards_for_partitions(test_time_for_partitions(historical_events, current_events)) end |
#compute_shards_for_file(historical_filename, current_test_filename = nil) ⇒ Object
Parses the output of a xctool json-stream reporter and compute the shards based of that see: github.com/facebook/xctool#included-reporters
80 81 82 |
# File 'lib/xcknife/stream_parser.rb', line 80 def compute_shards_for_file(historical_filename, current_test_filename = nil) compute_shards_for_events(parse_json_stream_file(historical_filename), parse_json_stream_file(current_test_filename)) end |
#compute_shards_for_partitions(test_time_for_partitions) ⇒ Object
88 89 90 91 92 |
# File 'lib/xcknife/stream_parser.rb', line 88 def compute_shards_for_partitions(test_time_for_partitions) PartitionResult.new(@stats, split_machines_proportionally(test_time_for_partitions).map do |partition| compute_single_shards(partition.number_of_shards, partition.test_time_map, options: partition.) end, test_time_for_partitions) end |
#compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) ⇒ Object
Computes a 2-aproximation to the optimal partition_time, which is an instance of the Open shop scheduling problem (which is NP-hard) see: en.wikipedia.org/wiki/Open-shop_scheduling
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/xcknife/stream_parser.rb', line 148 def compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) raise XCKnife::XCKnifeError, 'There are not enough workers provided' if number_of_shards <= 0 raise XCKnife::XCKnifeError, 'Cannot shard an empty partition_time' if test_time_map.empty? assignements = Array.new(number_of_shards) { MachineAssignment.new(Hash.new { |k, v| k[v] = [] }, 0) } list_of_test_target_class_times = [] test_time_map.each do |test_target, class_times| class_times.each do |class_name, duration_in_milliseconds| list_of_test_target_class_times << [test_target, class_name, duration_in_milliseconds] end end # This might seem like an uncessary level of indirection, but it allows us to keep # logic consistent regardless of the `split_bundles_across_machines` option group = list_of_test_target_class_times.group_by do |test_target, class_name, _duration_in_milliseconds| .split_bundles_across_machines ? [test_target, class_name] : test_target end list_of_test_target_classes_times = group.map do |(test_target, _), classes| [ test_target, classes.map { |_test_target, class_name, _duration_in_milliseconds| class_name }, classes.reduce(0) { |total_duration, (_test_target, _class_name, duration_in_milliseconds)| total_duration + duration_in_milliseconds } ] end list_of_test_target_classes_times.sort_by! { |_test_target, _class_names, duration_in_milliseconds| -duration_in_milliseconds } list_of_test_target_classes_times.each do |test_target, class_names, duration_in_milliseconds| assignemnt = assignements.min_by(&:total_time) assignemnt.test_time_map[test_target].concat class_names assignemnt.total_time += duration_in_milliseconds end if (empty_test_map_assignments = assignements.select { |a| a.test_time_map.empty? }) && !empty_test_map_assignments.empty? && !.allow_fewer_shards test_grouping = .split_bundles_across_machines ? 'classes' : 'targets' raise XCKnife::XCKnifeError, "Too many shards -- #{empty_test_map_assignments.size} of #{number_of_shards} assignments are empty," \ " because there are not enough test #{test_grouping} for that many shards." end assignements.reject! { |a| a.test_time_map.empty? } assignements end |
#parse_json_stream_file(filename) ⇒ Object
rubocop:enable Metrics/CyclomaticComplexity
193 194 195 196 197 198 199 |
# File 'lib/xcknife/stream_parser.rb', line 193 def parse_json_stream_file(filename) return nil if filename.nil? return [] unless File.exist?(filename) lines = IO.readlines(filename) lines.lazy.map { |line| OpenStruct.new(JSON.parse(line)) } end |
#split_machines_proportionally(partitions) ⇒ Object
rubocop:disable Metrics/CyclomaticComplexity
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/xcknife/stream_parser.rb', line 112 def split_machines_proportionally(partitions) total = 0 partitions.each do |test_time_map| each_duration(test_time_map) { |duration_in_milliseconds| total += duration_in_milliseconds } end used_shards = 0 assignable_shards = number_of_shards - partitions.size partition_with_machines_list = partitions.each_with_index.map do |test_time_map, | = @options_for_metapartition[] partition_time = 0 max_shard_count = test_time_map.each_value.map(&:size).reduce(&:+) || 1 max_shard_count = [max_shard_count, .max_shard_count].min if .max_shard_count each_duration(test_time_map) { |duration_in_milliseconds| partition_time += duration_in_milliseconds } n = [1 + (assignable_shards * partition_time.to_f / total).floor, max_shard_count].min used_shards += n PartitionWithMachines.new(test_time_map, n, partition_time, max_shard_count, ) end fifo_with_machines_who_can_use_more_shards = partition_with_machines_list.select { |x| x.number_of_shards < x.max_shard_count }.sort_by(&:partition_time) while number_of_shards > used_shards if fifo_with_machines_who_can_use_more_shards.empty? break if @allow_fewer_shards raise XCKnife::XCKnifeError, "There are #{number_of_shards - used_shards} extra machines" end machine = fifo_with_machines_who_can_use_more_shards.pop machine.number_of_shards += 1 used_shards += 1 fifo_with_machines_who_can_use_more_shards.unshift(machine) if machine.number_of_shards < machine.max_shard_count end partition_with_machines_list end |
#test_time_for_partitions(historical_events, current_events = nil) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/xcknife/stream_parser.rb', line 94 def test_time_for_partitions(historical_events, current_events = nil) analyzer = EventsAnalyzer.for(current_events, relevant_partitions) @stats[:current_total_tests] = analyzer.total_tests times_for_target_class = Hash.new { |h, current_target| h[current_target] = Hash.new(0) } each_test_event(historical_events) do |target_name, result| next unless relevant_partitions.include?(target_name) inc_stat :historical_total_tests next unless analyzer.test_class?(target_name, result.className) times_for_target_class[target_name][result.className] += (result.totalDuration * 1000).ceil end extrapolate_times_for_current_events(analyzer, times_for_target_class) if current_events hash_partitions(times_for_target_class) end |