Class: XCKnife::StreamParser
- Inherits:
-
Object
- Object
- XCKnife::StreamParser
- Includes:
- JsonStreamParserHelper
- Defined in:
- lib/xcknife/stream_parser.rb
Defined Under Namespace
Classes: MachineAssignment, Options, PartitionResult, PartitionWithMachines, ResultStats
Instance Attribute Summary collapse
-
#number_of_shards ⇒ Object
readonly
Returns the value of attribute number_of_shards.
-
#relevant_partitions ⇒ Object
readonly
Returns the value of attribute relevant_partitions.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
-
#test_partitions ⇒ Object
readonly
Returns the value of attribute test_partitions.
Instance Method Summary collapse
- #compute_shards_for_events(historical_events, current_events = nil) ⇒ Object
-
#compute_shards_for_file(historical_filename, current_test_filename = nil) ⇒ Object
Parses the output of a xctool json-stream reporter and compute the shards based of that see: github.com/facebook/xctool#included-reporters.
- #compute_shards_for_partitions(test_time_for_partitions) ⇒ Object
-
#compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) ⇒ Object
Computes a 2-aproximation to the optimal partition_time, which is an instance of the Open shop scheduling problem (which is NP-hard) see: en.wikipedia.org/wiki/Open-shop_scheduling.
-
#initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false) ⇒ StreamParser
constructor
A new instance of StreamParser.
- #parse_json_stream_file(filename) ⇒ Object
- #split_machines_proportionally(partitions) ⇒ Object
- #test_time_for_partitions(historical_events, current_events = nil) ⇒ Object
Methods included from JsonStreamParserHelper
#each_test_event, #normalize_result
Constructor Details
#initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false) ⇒ StreamParser
Returns a new instance of StreamParser.
13 14 15 16 17 18 19 20 21 |
# File 'lib/xcknife/stream_parser.rb', line 13 def initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false) @number_of_shards = number_of_shards @test_partitions = test_partitions.map(&:to_set) @relevant_partitions = test_partitions.flatten.to_set @stats = ResultStats.new @options_for_metapartition = .map { |o| Options::DEFAULT.merge(o) } @allow_fewer_shards = allow_fewer_shards ResultStats.members.each { |k| @stats[k] = 0 } end |
Instance Attribute Details
#number_of_shards ⇒ Object (readonly)
Returns the value of attribute number_of_shards.
11 12 13 |
# File 'lib/xcknife/stream_parser.rb', line 11 def number_of_shards @number_of_shards end |
#relevant_partitions ⇒ Object (readonly)
Returns the value of attribute relevant_partitions.
11 12 13 |
# File 'lib/xcknife/stream_parser.rb', line 11 def relevant_partitions @relevant_partitions end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
11 12 13 |
# File 'lib/xcknife/stream_parser.rb', line 11 def stats @stats end |
#test_partitions ⇒ Object (readonly)
Returns the value of attribute test_partitions.
11 12 13 |
# File 'lib/xcknife/stream_parser.rb', line 11 def test_partitions @test_partitions end |
Instance Method Details
#compute_shards_for_events(historical_events, current_events = nil) ⇒ Object
80 81 82 |
# File 'lib/xcknife/stream_parser.rb', line 80 def compute_shards_for_events(historical_events, current_events = nil) compute_shards_for_partitions(test_time_for_partitions(historical_events, current_events)) end |
#compute_shards_for_file(historical_filename, current_test_filename = nil) ⇒ Object
Parses the output of a xctool json-stream reporter and compute the shards based of that see: github.com/facebook/xctool#included-reporters
76 77 78 |
# File 'lib/xcknife/stream_parser.rb', line 76 def compute_shards_for_file(historical_filename, current_test_filename = nil) compute_shards_for_events(parse_json_stream_file(historical_filename), parse_json_stream_file(current_test_filename)) end |
#compute_shards_for_partitions(test_time_for_partitions) ⇒ Object
84 85 86 87 88 |
# File 'lib/xcknife/stream_parser.rb', line 84 def compute_shards_for_partitions(test_time_for_partitions) PartitionResult.new(@stats, split_machines_proportionally(test_time_for_partitions).map do |partition| compute_single_shards(partition.number_of_shards, partition.test_time_map, options: partition.) end, test_time_for_partitions) end |
#compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) ⇒ Object
Computes a 2-aproximation to the optimal partition_time, which is an instance of the Open shop scheduling problem (which is NP-hard) see: en.wikipedia.org/wiki/Open-shop_scheduling
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/xcknife/stream_parser.rb', line 142 def compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) raise XCKnife::XCKnifeError, "There are not enough workers provided" if number_of_shards <= 0 raise XCKnife::XCKnifeError, "Cannot shard an empty partition_time" if test_time_map.empty? assignements = Array.new(number_of_shards) { MachineAssignment.new(Hash.new { |k, v| k[v] = [] }, 0) } list_of_test_target_class_times = [] test_time_map.each do |test_target, class_times| class_times.each do |class_name, duration_in_milliseconds| list_of_test_target_class_times << [test_target, class_name, duration_in_milliseconds] end end # This might seem like an uncessary level of indirection, but it allows us to keep # logic consistent regardless of the `split_bundles_across_machines` option list_of_test_target_classes_times = list_of_test_target_class_times.group_by do |test_target, class_name, duration_in_milliseconds| if .split_bundles_across_machines [test_target, class_name] else test_target end end.map do |(test_target, _), classes| [ test_target, classes.map { |test_target, class_name, duration_in_milliseconds| class_name }, classes.reduce(0) { |total_duration, (test_target, class_name, duration_in_milliseconds)| total_duration + duration_in_milliseconds}, ] end list_of_test_target_classes_times.sort_by! { |test_target, class_names, duration_in_milliseconds| -duration_in_milliseconds } list_of_test_target_classes_times.each do |test_target, class_names, duration_in_milliseconds| assignemnt = assignements.min_by(&:total_time) assignemnt.test_time_map[test_target].concat class_names assignemnt.total_time += duration_in_milliseconds end if (empty_test_map_assignments = assignements.select { |a| a.test_time_map.empty? }) && !empty_test_map_assignments.empty? && !.allow_fewer_shards test_grouping = .split_bundles_across_machines ? 'classes' : 'targets' raise XCKnife::XCKnifeError, "Too many shards -- #{empty_test_map_assignments.size} of #{number_of_shards} assignments are empty," \ " because there are not enough test #{test_grouping} for that many shards." end assignements.reject! { |a| a.test_time_map.empty? } assignements end |
#parse_json_stream_file(filename) ⇒ Object
187 188 189 190 191 192 |
# File 'lib/xcknife/stream_parser.rb', line 187 def parse_json_stream_file(filename) return nil if filename.nil? return [] unless File.exists?(filename) lines = IO.readlines(filename) lines.lazy.map { |line| OpenStruct.new(JSON.load(line)) } end |
#split_machines_proportionally(partitions) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/xcknife/stream_parser.rb', line 105 def split_machines_proportionally(partitions) total = 0 partitions.each do |test_time_map| each_duration(test_time_map) { |duration_in_milliseconds| total += duration_in_milliseconds} end used_shards = 0 assignable_shards = number_of_shards - partitions.size partition_with_machines_list = partitions.each_with_index.map do |test_time_map, | = @options_for_metapartition[] partition_time = 0 max_shard_count = test_time_map.each_value.map(&:size).reduce(&:+) || 1 max_shard_count = [max_shard_count, .max_shard_count].min if .max_shard_count each_duration(test_time_map) { |duration_in_milliseconds| partition_time += duration_in_milliseconds } n = [1 + (assignable_shards * partition_time.to_f / total).floor, max_shard_count].min used_shards += n PartitionWithMachines.new(test_time_map, n, partition_time, max_shard_count, ) end fifo_with_machines_who_can_use_more_shards = partition_with_machines_list.select { |x| x.number_of_shards < x.max_shard_count}.sort_by(&:partition_time) while number_of_shards > used_shards if fifo_with_machines_who_can_use_more_shards.empty? break if @allow_fewer_shards raise XCKnife::XCKnifeError, "There are #{number_of_shards - used_shards} extra machines" end machine = fifo_with_machines_who_can_use_more_shards.pop machine.number_of_shards += 1 used_shards += 1 if machine.number_of_shards < machine.max_shard_count fifo_with_machines_who_can_use_more_shards.unshift(machine) end end partition_with_machines_list end |
#test_time_for_partitions(historical_events, current_events = nil) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/xcknife/stream_parser.rb', line 90 def test_time_for_partitions(historical_events, current_events = nil) analyzer = EventsAnalyzer.for(current_events, relevant_partitions) @stats[:current_total_tests] = analyzer.total_tests times_for_target_class = Hash.new { |h, current_target| h[current_target] = Hash.new(0) } each_test_event(historical_events) do |target_name, result| next unless relevant_partitions.include?(target_name) inc_stat :historical_total_tests next unless analyzer.has_test_class?(target_name, result.className) times_for_target_class[target_name][result.className] += (result.totalDuration * 1000).ceil end extrapolate_times_for_current_events(analyzer, times_for_target_class) if current_events hash_partitions(times_for_target_class) end |