Class: XCKnife::StreamParser

Inherits:
Object
  • Object
show all
Includes:
JsonStreamParserHelper
Defined in:
lib/xcknife/stream_parser.rb

Defined Under Namespace

Classes: MachineAssignment, Options, PartitionResult, PartitionWithMachines, ResultStats

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from JsonStreamParserHelper

#each_test_event, #normalize_result

Constructor Details

#initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false, on_extrapolation: nil) ⇒ StreamParser

Returns a new instance of StreamParser.



15
16
17
18
19
20
21
22
23
24
# File 'lib/xcknife/stream_parser.rb', line 15

def initialize(number_of_shards, test_partitions, options_for_metapartition: Array.new(test_partitions.size, {}), allow_fewer_shards: false, on_extrapolation: nil)
  @number_of_shards = number_of_shards
  @test_partitions = test_partitions.map(&:to_set)
  @relevant_partitions = test_partitions.flatten.to_set
  @stats = ResultStats.new
  @options_for_metapartition = options_for_metapartition.map { |o| Options::DEFAULT.merge(o) }
  @allow_fewer_shards = allow_fewer_shards
  @on_extrapolation = on_extrapolation
  ResultStats.members.each { |k| @stats[k] = 0 }
end

Instance Attribute Details

#number_of_shardsObject (readonly)

Returns the value of attribute number_of_shards.



13
14
15
# File 'lib/xcknife/stream_parser.rb', line 13

def number_of_shards
  @number_of_shards
end

#relevant_partitionsObject (readonly)

Returns the value of attribute relevant_partitions.



13
14
15
# File 'lib/xcknife/stream_parser.rb', line 13

def relevant_partitions
  @relevant_partitions
end

#statsObject (readonly)

Returns the value of attribute stats.



13
14
15
# File 'lib/xcknife/stream_parser.rb', line 13

def stats
  @stats
end

#test_partitionsObject (readonly)

Returns the value of attribute test_partitions.



13
14
15
# File 'lib/xcknife/stream_parser.rb', line 13

def test_partitions
  @test_partitions
end

Instance Method Details

#compute_shards_for_events(historical_events, current_events = nil) ⇒ Object



85
86
87
# File 'lib/xcknife/stream_parser.rb', line 85

def compute_shards_for_events(historical_events, current_events = nil)
  compute_shards_for_partitions(test_time_for_partitions(historical_events, current_events))
end

#compute_shards_for_file(historical_filename, current_test_filename = nil) ⇒ Object

Parses the output of a xctool json-stream reporter and compute the shards based of that see: github.com/facebook/xctool#included-reporters

Parameters:

  • historical_filename:

    String the path of the, usually historical, test time performance.

  • current_test_filename: (String, nil)

    the path of the current test names and targets,



81
82
83
# File 'lib/xcknife/stream_parser.rb', line 81

def compute_shards_for_file(historical_filename, current_test_filename = nil)
  compute_shards_for_events(parse_json_stream_file(historical_filename), parse_json_stream_file(current_test_filename))
end

#compute_shards_for_partitions(test_time_for_partitions) ⇒ Object



89
90
91
92
93
# File 'lib/xcknife/stream_parser.rb', line 89

def compute_shards_for_partitions(test_time_for_partitions)
  PartitionResult.new(@stats, split_machines_proportionally(test_time_for_partitions).map do |partition|
    compute_single_shards(partition.number_of_shards, partition.test_time_map, options: partition.options)
  end, test_time_for_partitions)
end

#compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT) ⇒ Object

Computes a 2-aproximation to the optimal partition_time, which is an instance of the Open shop scheduling problem (which is NP-hard) see: en.wikipedia.org/wiki/Open-shop_scheduling



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/xcknife/stream_parser.rb', line 149

def compute_single_shards(number_of_shards, test_time_map, options: Options::DEFAULT)
  raise XCKnife::XCKnifeError, 'There are not enough workers provided' if number_of_shards <= 0
  raise XCKnife::XCKnifeError, 'Cannot shard an empty partition_time' if test_time_map.empty?

  assignements = Array.new(number_of_shards) { MachineAssignment.new(Hash.new { |k, v| k[v] = [] }, 0) }

  list_of_test_target_class_times = []
  test_time_map.each do |test_target, class_times|
    class_times.each do |class_name, duration_in_milliseconds|
      list_of_test_target_class_times << [test_target, class_name, duration_in_milliseconds]
    end
  end

  # This might seem like an uncessary level of indirection, but it allows us to keep
  # logic consistent regardless of the `split_bundles_across_machines` option
  group = list_of_test_target_class_times.group_by do |test_target, class_name, _duration_in_milliseconds|
    options.split_bundles_across_machines ? [test_target, class_name] : test_target
  end

  list_of_test_target_classes_times = group.map do |(test_target, _), classes|
    [
      test_target,
      classes.map { |_test_target, class_name, _duration_in_milliseconds| class_name },
      classes.reduce(0) { |total_duration, (_test_target, _class_name, duration_in_milliseconds)| total_duration + duration_in_milliseconds }
    ]
  end

  list_of_test_target_classes_times.sort_by! { |_test_target, _class_names, duration_in_milliseconds| -duration_in_milliseconds }
  list_of_test_target_classes_times.each do |test_target, class_names, duration_in_milliseconds|
    assignemnt = assignements.min_by(&:total_time)
    assignemnt.test_time_map[test_target].concat class_names
    assignemnt.total_time += duration_in_milliseconds
  end

  if (empty_test_map_assignments = assignements.select { |a| a.test_time_map.empty? }) && !empty_test_map_assignments.empty? && !options.allow_fewer_shards
    test_grouping = options.split_bundles_across_machines ? 'classes' : 'targets'
    raise XCKnife::XCKnifeError, "Too many shards -- #{empty_test_map_assignments.size} of #{number_of_shards} assignments are empty," \
                                 " because there are not enough test #{test_grouping} for that many shards."
  end
  assignements.reject! { |a| a.test_time_map.empty? }

  assignements
end

#parse_json_stream_file(filename) ⇒ Object

rubocop:enable Metrics/CyclomaticComplexity



194
195
196
197
198
199
200
# File 'lib/xcknife/stream_parser.rb', line 194

def parse_json_stream_file(filename)
  return nil if filename.nil?
  return [] unless File.exist?(filename)

  lines = IO.readlines(filename)
  lines.lazy.map { |line| OpenStruct.new(JSON.parse(line)) }
end

#split_machines_proportionally(partitions) ⇒ Object

rubocop:disable Metrics/CyclomaticComplexity



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/xcknife/stream_parser.rb', line 113

def split_machines_proportionally(partitions)
  total = 0
  partitions.each do |test_time_map|
    each_duration(test_time_map) { |duration_in_milliseconds| total += duration_in_milliseconds }
  end

  used_shards = 0
  assignable_shards = number_of_shards - partitions.size
  partition_with_machines_list = partitions.each_with_index.map do |test_time_map, metapartition|
    options = @options_for_metapartition[metapartition]
    partition_time = 0
    max_shard_count = test_time_map.each_value.map(&:size).reduce(&:+) || 1
    max_shard_count = [max_shard_count, options.max_shard_count].min if options.max_shard_count
    each_duration(test_time_map) { |duration_in_milliseconds| partition_time += duration_in_milliseconds }
    n = [1 + (assignable_shards * partition_time.to_f / total).floor, max_shard_count].min
    used_shards += n
    PartitionWithMachines.new(test_time_map, n, partition_time, max_shard_count, options)
  end

  fifo_with_machines_who_can_use_more_shards = partition_with_machines_list.select { |x| x.number_of_shards < x.max_shard_count }.sort_by(&:partition_time)
  while number_of_shards > used_shards
    if fifo_with_machines_who_can_use_more_shards.empty?
      break if @allow_fewer_shards

      raise XCKnife::XCKnifeError, "There are #{number_of_shards - used_shards} extra machines"
    end
    machine = fifo_with_machines_who_can_use_more_shards.pop
    machine.number_of_shards += 1
    used_shards += 1
    fifo_with_machines_who_can_use_more_shards.unshift(machine) if machine.number_of_shards < machine.max_shard_count
  end
  partition_with_machines_list
end

#test_time_for_partitions(historical_events, current_events = nil) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/xcknife/stream_parser.rb', line 95

def test_time_for_partitions(historical_events, current_events = nil)
  analyzer = EventsAnalyzer.for(current_events, relevant_partitions)
  @stats[:current_total_tests] = analyzer.total_tests
  times_for_target_class = Hash.new { |h, current_target| h[current_target] = Hash.new(0) }
  each_test_event(historical_events) do |target_name, result|
    next unless relevant_partitions.include?(target_name)

    inc_stat :historical_total_tests
    next unless analyzer.test_class?(target_name, result.className)

    times_for_target_class[target_name][result.className] += (result.totalDuration * 1000).ceil
  end

  extrapolate_times_for_current_events(analyzer, times_for_target_class) if current_events
  hash_partitions(times_for_target_class)
end