Class: BigqueryMigration::Schema
- Inherits:
-
Array
- Object
- Array
- BigqueryMigration::Schema
- Defined in:
- lib/bigquery_migration/schema.rb
Constant Summary collapse
- ALLOWED_FIELD_TYPES =
Set.new(['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'RECORD', 'TIMESTAMP', 'BYTES', 'DATE', 'TIME', 'DATETIME'])
- ALLOWED_FIELD_MODES =
Set.new(['NULLABLE', 'REQUIRED', 'REPEATED'])
Class Method Summary collapse
- .build_query_fields(source_columns, target_columns) ⇒ Object
-
.diff_columns(source_columns, target_columns) ⇒ Object
target_columns - source_columns.
-
.diff_columns_by_name(source_columns, target_columns) ⇒ Object
diff with only column_names target_columns - source_columns.
- .equals?(source_columns, target_columns) ⇒ Boolean
- .find_column_by_name(columns, name) ⇒ Object
-
.flattened_columns(columns, parent_name: nil) ⇒ Object
- { name: ‘citiesLived’, type: ‘RECORD’, fields: [ { name: ‘place’, type: ‘RECORD’, fields: [ { name: ‘city’, type: ‘STRING’ }, { name: ‘postcode’, type: ‘STRING’ }
-
}, { name: ‘yearsLived’, type: ‘INTEGER’ } ] }] { ‘citiesLived.place.city’ => { type: ‘STRING’ }, ‘citiesLived.place.postcode’ => { type: ‘STRING’ }, ‘citiesLived.yearsLived’ => { type: ‘INTEGER’ } }.
- .make_nullable!(columns) ⇒ Object
- .normalize_columns(columns) ⇒ Object
- .reject_columns!(drop_columns, target_columns) ⇒ Object
-
.reverse_merge!(source_columns, target_columns) ⇒ Object
1.
- .shallow_normalize_column(column) ⇒ Object
- .shallow_normalize_column!(column) ⇒ Object
- .shallow_normalize_columns(columns) ⇒ Object
- .shallow_normalize_columns!(columns) ⇒ Object
- .symbolize_keys!(column) ⇒ Object
- .validate_columns!(columns) ⇒ Object
- .validate_mode!(mode) ⇒ Object
-
.validate_name!(name) ⇒ Object
The name must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_), and must start with a letter or underscore.
-
.validate_permitted_operations!(source_columns, target_columns) ⇒ Object
validates permitted changes from old schema to new schema.
-
.validate_permitted_operations_for_mode!(source_column, target_column) ⇒ Object
Allowed conversion rule is as follows:.
-
.validate_permitted_operations_for_type!(source_column, target_column) ⇒ Object
Disallowed conversion rule is as follows:.
- .validate_type!(type) ⇒ Object
Instance Method Summary collapse
- #build_query_fields(source_columns) ⇒ Object
-
#diff_columns(source_columns) ⇒ Object
self - source_columns.
-
#diff_columns_by_name(source_columns) ⇒ Object
diff with only column names self - source_columns.
- #equals?(source_columns) ⇒ Boolean
- #find_column_by_name(name) ⇒ Object
- #flattened_columns ⇒ Object
-
#initialize(columns = []) ⇒ Schema
constructor
A new instance of Schema.
- #normalize_columns ⇒ Object
- #reject_columns!(drop_columns) ⇒ Object
-
#reverse_merge!(source_columns) ⇒ Object
A.merge!(B) => B overwrites A A.reverse_merge!(B) => A overwrites B, but A is modified.
- #shallow_normalize_columns ⇒ Object
- #shallow_normalize_columns! ⇒ Object
- #validate_columns! ⇒ Object
- #validate_permitted_operations!(source_columns) ⇒ Object
Constructor Details
#initialize(columns = []) ⇒ Schema
Returns a new instance of Schema.
10 11 12 13 14 |
# File 'lib/bigquery_migration/schema.rb', line 10 def initialize(columns = []) normalized = self.class.normalize_columns(columns) super(normalized) validate_columns! end |
Class Method Details
.build_query_fields(source_columns, target_columns) ⇒ Object
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/bigquery_migration/schema.rb', line 367 def build_query_fields(source_columns, target_columns) flattened_source_columns = flattened_columns(source_columns) flattened_target_columns = flattened_columns(target_columns) query_fields = flattened_target_columns.map do |flattened_name, flattened_target_column| flattened_source_column = flattened_source_columns[flattened_name] target_type = flattened_target_column[:type].upcase if flattened_source_column "#{target_type}(#{flattened_name}) AS #{flattened_name}" else flattened_name # MEMO: NULL cast like "#{target_type}(NULL) AS #{flattened_name}" breaks RECORD columns as # INTEGER(NULL) AS add_record.add_record.add_column1 => add_record_add_record_add_column1 # We have to add columns with patch_table beforehand end end end |
.diff_columns(source_columns, target_columns) ⇒ Object
target_columns - source_columns
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/bigquery_migration/schema.rb', line 276 def diff_columns(source_columns, target_columns) _target_columns = shallow_normalize_columns(target_columns) _source_columns = shallow_normalize_columns(source_columns) diff_columns = _target_columns - _source_columns # shallow diff diff_columns.map do |target_column| t = target_column source_column = find_column_by_name(_source_columns, target_column[:name]) next t unless source_column next t unless target_column[:type] == 'RECORD' and source_column[:type] == 'RECORD' next t unless target_column[:fields] and source_column[:fields] # recusive diff for RECORD columns diff_fields = diff_columns(source_column[:fields], target_column[:fields]) next nil if diff_fields.empty? # remove target_column[:fields] = diff_fields target_column end.compact end |
.diff_columns_by_name(source_columns, target_columns) ⇒ Object
diff with only column_names target_columns - source_columns
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/bigquery_migration/schema.rb', line 297 def diff_columns_by_name(source_columns, target_columns) _target_columns = shallow_normalize_columns(target_columns) _source_columns = shallow_normalize_columns(source_columns) diff_columns = _target_columns - _source_columns # shallow diff diff_columns.map do |target_column| t = target_column source_column = find_column_by_name(_source_columns, target_column[:name]) next t unless source_column next nil unless target_column[:type] == 'RECORD' and source_column[:type] == 'RECORD' next nil unless target_column[:fields] and source_column[:fields] # recusive diff for RECORD columns diff_fields = diff_columns_by_name(source_column[:fields], target_column[:fields]) next nil if diff_fields.empty? # remove target_column[:fields] = diff_fields target_column end.compact end |
.equals?(source_columns, target_columns) ⇒ Boolean
270 271 272 273 |
# File 'lib/bigquery_migration/schema.rb', line 270 def equals?(source_columns, target_columns) diff_columns(source_columns, target_columns).empty? and \ diff_columns(target_columns, source_columns).empty? end |
.find_column_by_name(columns, name) ⇒ Object
109 110 111 |
# File 'lib/bigquery_migration/schema.rb', line 109 def find_column_by_name(columns, name) (columns || []).find { |c| c[:name] == name } end |
.flattened_columns(columns, parent_name: nil) ⇒ Object
[{
name: 'citiesLived',
type: 'RECORD',
fields: [
{
name: 'place', type: 'RECORD',
fields: [
{ name: 'city', type: 'STRING' }, { name: 'postcode', type: 'STRING' }
]
},
{ name: 'yearsLived', type: 'INTEGER' }
]
}] {
'citiesLived.place.city' => {
type: 'STRING'
},
'citiesLived.place.postcode' => {
type: 'STRING'
},
'citiesLived.yearsLived' => {
type: 'INTEGER'
}
}
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/bigquery_migration/schema.rb', line 254 def flattened_columns(columns, parent_name: nil) result = {} columns.each do |column| column_name = parent_name.nil? ? column[:name] : "#{parent_name}.#{column[:name]}" if column[:type].upcase != 'RECORD' result[column_name] = {}.tap do |value| value[:type] = column[:type] value[:mode] = column[:mode] if column[:mode] end else result.merge!(flattened_columns(column[:fields], parent_name: column_name)) end end result end |
.make_nullable!(columns) ⇒ Object
386 387 388 389 390 391 392 393 394 395 |
# File 'lib/bigquery_migration/schema.rb', line 386 def make_nullable!(columns) columns.each do |column| if column[:fields] make_nullable!(column[:fields]) else column[:mode] = 'NULLABLE' end end columns end |
.normalize_columns(columns) ⇒ Object
190 191 192 193 194 195 196 197 198 |
# File 'lib/bigquery_migration/schema.rb', line 190 def normalize_columns(columns) columns = shallow_normalize_columns(columns) columns.map do |column| if column[:type] == 'RECORD' and column[:fields] column[:fields] = normalize_columns(column[:fields]) end column end end |
.reject_columns!(drop_columns, target_columns) ⇒ Object
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/bigquery_migration/schema.rb', line 342 def reject_columns!(drop_columns, target_columns) flattened_drop_columns = flattened_columns(drop_columns) flattened_drop_columns.keys.each do |flattened_name| # paths like a %w(citiesLived place city child1) paths = flattened_name.split('.') # object_id of fields and target_columns are different. # But the internal elements refer to the same ones fields = target_columns paths.each do |path| # The last element of the path does not have the fields next if path == paths.last # find recursively column = fields.find { |f| f[:name] == path } next if column.nil? fields = column[:fields] end unless fields.empty? fields.delete_if { |f| f[:name] == paths.last } end end target_columns end |
.reverse_merge!(source_columns, target_columns) ⇒ Object
-
target_column ||= source_column || ‘NULLABLE’ (not overwrite, but set if does not exist)
-
Add into target_columns if a source column does not exist in target_columns
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/bigquery_migration/schema.rb', line 321 def reverse_merge!(source_columns, target_columns) shallow_normalize_columns!(source_columns) shallow_normalize_columns!(target_columns) source_columns.map do |source_column| if target_column = find_column_by_name(target_columns, source_column[:name]) target_column[:mode] ||= source_column[:mode] || 'NULLABLE' target_column[:type] ||= source_column[:type] # should never be happened # Recursive merge fields of `RECORD` type if target_column[:type] == 'RECORD' and target_column[:fields] and source_column[:fields] reverse_merge!(source_column[:fields], target_column[:fields]) end else target_column = source_column.dup target_column[:mode] ||= 'NULLABLE' target_columns << target_column end end target_columns end |
.shallow_normalize_column(column) ⇒ Object
209 210 211 |
# File 'lib/bigquery_migration/schema.rb', line 209 def shallow_normalize_column(column) shallow_normalize_column!(column.dup) end |
.shallow_normalize_column!(column) ⇒ Object
213 214 215 216 217 218 219 |
# File 'lib/bigquery_migration/schema.rb', line 213 def shallow_normalize_column!(column) symbolize_keys!(column) column[:type] = column[:type].upcase if column[:type] column[:mode] ||= 'NULLABLE' column[:mode] = column[:mode].upcase column end |
.shallow_normalize_columns(columns) ⇒ Object
200 201 202 |
# File 'lib/bigquery_migration/schema.rb', line 200 def shallow_normalize_columns(columns) columns.map {|column| shallow_normalize_column(column) } end |
.shallow_normalize_columns!(columns) ⇒ Object
204 205 206 207 |
# File 'lib/bigquery_migration/schema.rb', line 204 def shallow_normalize_columns!(columns) columns.each {|column| shallow_normalize_column!(column) } columns end |
.symbolize_keys!(column) ⇒ Object
221 222 223 224 225 226 |
# File 'lib/bigquery_migration/schema.rb', line 221 def symbolize_keys!(column) new_column = column.map do |key, val| [key.to_sym, val] end.to_h column.replace(new_column) end |
.validate_columns!(columns) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/bigquery_migration/schema.rb', line 97 def validate_columns!(columns) columns.each do |column| validate_name!(column[:name]) validate_type!(column[:type]) validate_mode!(column[:mode]) if column[:mode] if column[:type] == 'RECORD' validate_columns!(column[:fields]) end end end |
.validate_mode!(mode) ⇒ Object
91 92 93 94 95 |
# File 'lib/bigquery_migration/schema.rb', line 91 def validate_mode!(mode) unless ALLOWED_FIELD_MODES.include?(mode.upcase) raise ConfigError, "Column mode `#{mode}` is not allowed mode" end end |
.validate_name!(name) ⇒ Object
The name must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_), and must start with a letter or underscore. The maximum length is 128 characters.
76 77 78 79 80 81 82 83 |
# File 'lib/bigquery_migration/schema.rb', line 76 def validate_name!(name) unless name =~ /\A[a-zA-Z_]+\w*\Z/ raise ConfigError, "Column name `#{name}` is invalid format" end unless name.length < 128 raise ConfigError, "Column name `#{name}` must be less than 128" end end |
.validate_permitted_operations!(source_columns, target_columns) ⇒ Object
validates permitted changes from old schema to new schema
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/bigquery_migration/schema.rb', line 114 def validate_permitted_operations!(source_columns, target_columns) flattened_source_columns = flattened_columns(normalize_columns(source_columns)) flattened_target_columns = flattened_columns(normalize_columns(target_columns)) flattened_target_columns.keys.each do |flattened_name| next unless flattened_source_columns.key?(flattened_name) validate_permitted_operations_for_type!( flattened_source_columns[flattened_name], flattened_target_columns[flattened_name] ) validate_permitted_operations_for_mode!( flattened_source_columns[flattened_name], flattened_target_columns[flattened_name] ) end end |
.validate_permitted_operations_for_mode!(source_column, target_column) ⇒ Object
Allowed conversion rule is as follows:
(new) => NULLABLE, REPEATED
NULLABLE => NULLABLE
REQUIRED => REQUIRED, NULLABLE
REPEATED => REPEATED
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/bigquery_migration/schema.rb', line 165 def validate_permitted_operations_for_mode!(source_column, target_column) source_column = shallow_normalize_column(source_column) target_column = shallow_normalize_column(target_column) source_mode = source_column[:mode] target_mode = target_column[:mode] return if source_mode == target_mode msg = "(#{source_column.to_h} => #{target_column.to_h})" case source_mode when nil if target_mode == 'REQUIRED' raise ConfigError, "Newly adding a `REQUIRED` column is not allowed #{msg}" end when 'NULLABLE' raise ConfigError, "`NULLABLE` column can not be changed #{msg}" when 'REQUIRED' if target_mode == 'REPEATED' raise ConfigError, "`REQUIRED` column can not be changed to `REPEATED` #{msg}" end when 'REPEATED' raise ConfigError, "`REPEATED` column can not be changed #{msg}" end end |
.validate_permitted_operations_for_type!(source_column, target_column) ⇒ Object
Disallowed conversion rule is as follows:
type: RECORD => type: others
mode: REPEATED => change type
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/bigquery_migration/schema.rb', line 139 def validate_permitted_operations_for_type!(source_column, target_column) source_column = shallow_normalize_column(source_column) target_column = shallow_normalize_column(target_column) msg = "(#{source_column.to_h} => #{target_column.to_h})" if source_column[:type] == 'RECORD' if target_column[:type] != 'RECORD' raise ConfigError, "`RECORD` can not be changed #{msg}" end end if source_column[:mode] and source_column[:mode] == 'REPEATED' if source_column[:type] != target_column[:type] raise ConfigError, "`REPEATED` mode column's type can not be changed #{msg}" end end end |
.validate_type!(type) ⇒ Object
85 86 87 88 89 |
# File 'lib/bigquery_migration/schema.rb', line 85 def validate_type!(type) unless ALLOWED_FIELD_TYPES.include?(type.upcase) raise ConfigError, "Column type `#{type}` is not allowed type" end end |
Instance Method Details
#build_query_fields(source_columns) ⇒ Object
69 70 71 |
# File 'lib/bigquery_migration/schema.rb', line 69 def build_query_fields(source_columns) self.class.build_query_fields(source_columns, self) end |
#diff_columns(source_columns) ⇒ Object
self - source_columns
49 50 51 |
# File 'lib/bigquery_migration/schema.rb', line 49 def diff_columns(source_columns) self.class.diff_columns(source_columns, self) end |
#diff_columns_by_name(source_columns) ⇒ Object
diff with only column names self - source_columns
55 56 57 |
# File 'lib/bigquery_migration/schema.rb', line 55 def diff_columns_by_name(source_columns) self.class.diff_columns_by_name(source_columns, self) end |
#equals?(source_columns) ⇒ Boolean
44 45 46 |
# File 'lib/bigquery_migration/schema.rb', line 44 def equals?(source_columns) self.class.equals?(source_columns, self) end |
#find_column_by_name(name) ⇒ Object
16 17 18 |
# File 'lib/bigquery_migration/schema.rb', line 16 def find_column_by_name(name) self.class.find_column_by_name(self, name) end |
#flattened_columns ⇒ Object
40 41 42 |
# File 'lib/bigquery_migration/schema.rb', line 40 def flattened_columns self.class.flattened_columns(self) end |
#normalize_columns ⇒ Object
29 30 31 |
# File 'lib/bigquery_migration/schema.rb', line 29 def normalize_columns self.class.normalize_columns(self) end |
#reject_columns!(drop_columns) ⇒ Object
65 66 67 |
# File 'lib/bigquery_migration/schema.rb', line 65 def reject_columns!(drop_columns) self.class.reject_columns!(drop_columns, self) end |
#reverse_merge!(source_columns) ⇒ Object
A.merge!(B) => B overwrites A A.reverse_merge!(B) => A overwrites B, but A is modified
61 62 63 |
# File 'lib/bigquery_migration/schema.rb', line 61 def reverse_merge!(source_columns) self.class.reverse_merge!(source_columns, self) end |
#shallow_normalize_columns ⇒ Object
33 34 35 |
# File 'lib/bigquery_migration/schema.rb', line 33 def shallow_normalize_columns self.class.shallow_normalize_columns(self) end |
#shallow_normalize_columns! ⇒ Object
36 37 38 |
# File 'lib/bigquery_migration/schema.rb', line 36 def shallow_normalize_columns! self.class.shallow_normalize_column!(self) end |
#validate_columns! ⇒ Object
20 21 22 |
# File 'lib/bigquery_migration/schema.rb', line 20 def validate_columns! self.class.validate_columns!(self) end |
#validate_permitted_operations!(source_columns) ⇒ Object
24 25 26 27 |
# File 'lib/bigquery_migration/schema.rb', line 24 def validate_permitted_operations!(source_columns) target_columns = self self.class.validate_permitted_operations!(source_columns, target_columns) end |