Class: BigqueryMigration::Schema

Inherits:
Array
  • Object
show all
Defined in:
lib/bigquery_migration/schema.rb

Constant Summary collapse

ALLOWED_FIELD_TYPES =
Set.new(['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'RECORD', 'TIMESTAMP', 'BYTES', 'DATE', 'TIME', 'DATETIME'])
ALLOWED_FIELD_MODES =
Set.new(['NULLABLE', 'REQUIRED', 'REPEATED'])

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(columns = []) ⇒ Schema

Returns a new instance of Schema.



10
11
12
13
14
# File 'lib/bigquery_migration/schema.rb', line 10

def initialize(columns = [])
  normalized = self.class.normalize_columns(columns)
  super(normalized)
  validate_columns!
end

Class Method Details

.build_query_fields(source_columns, target_columns) ⇒ Object



367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/bigquery_migration/schema.rb', line 367

def build_query_fields(source_columns, target_columns)
  flattened_source_columns = flattened_columns(source_columns)
  flattened_target_columns = flattened_columns(target_columns)

  query_fields = flattened_target_columns.map do |flattened_name, flattened_target_column|
    flattened_source_column = flattened_source_columns[flattened_name]
    target_type = flattened_target_column[:type].upcase

    if flattened_source_column
      "#{target_type}(#{flattened_name}) AS #{flattened_name}"
    else
      flattened_name
      #  MEMO: NULL cast like "#{target_type}(NULL) AS #{flattened_name}" breaks RECORD columns as
      #  INTEGER(NULL) AS add_record.add_record.add_column1 => add_record_add_record_add_column1
      #  We have to add columns with patch_table beforehand
    end
  end
end

.diff_columns(source_columns, target_columns) ⇒ Object

target_columns - source_columns



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/bigquery_migration/schema.rb', line 276

def diff_columns(source_columns, target_columns)
  _target_columns = shallow_normalize_columns(target_columns)
  _source_columns = shallow_normalize_columns(source_columns)
  diff_columns = _target_columns - _source_columns # shallow diff

  diff_columns.map do |target_column|
    t = target_column
    source_column = find_column_by_name(_source_columns, target_column[:name])
    next t unless source_column
    next t unless target_column[:type] == 'RECORD' and source_column[:type] == 'RECORD'
    next t unless target_column[:fields] and source_column[:fields]
    # recusive diff for RECORD columns
    diff_fields = diff_columns(source_column[:fields], target_column[:fields])
    next nil if diff_fields.empty? # remove
    target_column[:fields] = diff_fields
    target_column
  end.compact
end

.diff_columns_by_name(source_columns, target_columns) ⇒ Object

diff with only column_names target_columns - source_columns



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/bigquery_migration/schema.rb', line 297

def diff_columns_by_name(source_columns, target_columns)
  _target_columns = shallow_normalize_columns(target_columns)
  _source_columns = shallow_normalize_columns(source_columns)
  diff_columns = _target_columns - _source_columns # shallow diff

  diff_columns.map do |target_column|
    t = target_column
    source_column = find_column_by_name(_source_columns, target_column[:name])
    next t unless source_column
    next nil unless target_column[:type] == 'RECORD' and source_column[:type] == 'RECORD'
    next nil unless target_column[:fields] and source_column[:fields]
    # recusive diff for RECORD columns
    diff_fields = diff_columns_by_name(source_column[:fields], target_column[:fields])
    next nil if diff_fields.empty? # remove
    target_column[:fields] = diff_fields
    target_column
  end.compact
end

.equals?(source_columns, target_columns) ⇒ Boolean

Returns:

  • (Boolean)


270
271
272
273
# File 'lib/bigquery_migration/schema.rb', line 270

def equals?(source_columns, target_columns)
  diff_columns(source_columns, target_columns).empty? and \
    diff_columns(target_columns, source_columns).empty?
end

.find_column_by_name(columns, name) ⇒ Object



109
110
111
# File 'lib/bigquery_migration/schema.rb', line 109

def find_column_by_name(columns, name)
  (columns || []).find { |c| c[:name] == name }
end

.flattened_columns(columns, parent_name: nil) ⇒ Object

[{

name: 'citiesLived',
type: 'RECORD',
fields: [
  {
    name: 'place', type: 'RECORD',
    fields: [
      { name: 'city', type: 'STRING' }, { name: 'postcode', type: 'STRING' }
    ]
  },
  { name: 'yearsLived', type: 'INTEGER' }
]

}] {

'citiesLived.place.city' => {
  type: 'STRING'
},
'citiesLived.place.postcode' => {
  type: 'STRING'
},
'citiesLived.yearsLived' => {
  type: 'INTEGER'
}

}

Parameters:

  • columns (Array)

Returns:

  • Hash



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/bigquery_migration/schema.rb', line 254

def flattened_columns(columns, parent_name: nil)
  result = {}
  columns.each do |column|
    column_name = parent_name.nil? ? column[:name] : "#{parent_name}.#{column[:name]}"
    if column[:type].upcase != 'RECORD'
      result[column_name] = {}.tap do |value|
        value[:type] = column[:type]
        value[:mode] = column[:mode] if column[:mode]
      end
    else
      result.merge!(flattened_columns(column[:fields], parent_name: column_name))
    end
  end
  result
end

.make_nullable!(columns) ⇒ Object



386
387
388
389
390
391
392
393
394
395
# File 'lib/bigquery_migration/schema.rb', line 386

def make_nullable!(columns)
  columns.each do |column|
    if column[:fields]
      make_nullable!(column[:fields])
    else
      column[:mode] = 'NULLABLE'
    end
  end
  columns
end

.normalize_columns(columns) ⇒ Object



190
191
192
193
194
195
196
197
198
# File 'lib/bigquery_migration/schema.rb', line 190

def normalize_columns(columns)
  columns = shallow_normalize_columns(columns)
  columns.map do |column|
    if column[:type] == 'RECORD' and column[:fields]
      column[:fields] = normalize_columns(column[:fields])
    end
    column
  end
end

.reject_columns!(drop_columns, target_columns) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/bigquery_migration/schema.rb', line 342

def reject_columns!(drop_columns, target_columns)
  flattened_drop_columns = flattened_columns(drop_columns)

  flattened_drop_columns.keys.each do |flattened_name|
    # paths like a %w(citiesLived place city child1)
    paths = flattened_name.split('.')
    # object_id of fields and target_columns are different.
    # But the internal elements refer to the same ones
    fields = target_columns
    paths.each do |path|
      # The last element of the path does not have the fields
      next if path == paths.last
      # find recursively
      column = fields.find { |f| f[:name] == path }
      next if column.nil?
      fields = column[:fields]
    end

    unless fields.empty?
      fields.delete_if { |f| f[:name] == paths.last }
    end
  end
  target_columns
end

.reverse_merge!(source_columns, target_columns) ⇒ Object

  1. target_column ||= source_column || ‘NULLABLE’ (not overwrite, but set if does not exist)

  2. Add into target_columns if a source column does not exist in target_columns

Parameters:

  • source_columns (Array)
  • target_columns (Array)


321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/bigquery_migration/schema.rb', line 321

def reverse_merge!(source_columns, target_columns)
  shallow_normalize_columns!(source_columns)
  shallow_normalize_columns!(target_columns)

  source_columns.map do |source_column|
    if target_column = find_column_by_name(target_columns, source_column[:name])
      target_column[:mode] ||= source_column[:mode] || 'NULLABLE'
      target_column[:type] ||= source_column[:type] # should never be happened
      # Recursive merge fields of `RECORD` type
      if target_column[:type] == 'RECORD' and target_column[:fields] and source_column[:fields]
        reverse_merge!(source_column[:fields], target_column[:fields])
      end
    else
      target_column = source_column.dup
      target_column[:mode] ||= 'NULLABLE'
      target_columns << target_column
    end
  end
  target_columns
end

.shallow_normalize_column(column) ⇒ Object



209
210
211
# File 'lib/bigquery_migration/schema.rb', line 209

def shallow_normalize_column(column)
  shallow_normalize_column!(column.dup)
end

.shallow_normalize_column!(column) ⇒ Object



213
214
215
216
217
218
219
# File 'lib/bigquery_migration/schema.rb', line 213

def shallow_normalize_column!(column)
  symbolize_keys!(column)
  column[:type] = column[:type].upcase if column[:type]
  column[:mode] ||= 'NULLABLE'
  column[:mode] = column[:mode].upcase
  column
end

.shallow_normalize_columns(columns) ⇒ Object



200
201
202
# File 'lib/bigquery_migration/schema.rb', line 200

def shallow_normalize_columns(columns)
  columns.map {|column| shallow_normalize_column(column) }
end

.shallow_normalize_columns!(columns) ⇒ Object



204
205
206
207
# File 'lib/bigquery_migration/schema.rb', line 204

def shallow_normalize_columns!(columns)
  columns.each {|column| shallow_normalize_column!(column) }
  columns
end

.symbolize_keys!(column) ⇒ Object



221
222
223
224
225
226
# File 'lib/bigquery_migration/schema.rb', line 221

def symbolize_keys!(column)
  new_column = column.map do |key, val|
    [key.to_sym, val]
  end.to_h
  column.replace(new_column)
end

.validate_columns!(columns) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/bigquery_migration/schema.rb', line 97

def validate_columns!(columns)
  columns.each do |column|
    validate_name!(column[:name])
    validate_type!(column[:type])
    validate_mode!(column[:mode]) if column[:mode]

    if column[:type] == 'RECORD'
      validate_columns!(column[:fields])
    end
  end
end

.validate_mode!(mode) ⇒ Object



91
92
93
94
95
# File 'lib/bigquery_migration/schema.rb', line 91

def validate_mode!(mode)
  unless ALLOWED_FIELD_MODES.include?(mode.upcase)
    raise ConfigError, "Column mode `#{mode}` is not allowed mode"
  end
end

.validate_name!(name) ⇒ Object

The name must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_), and must start with a letter or underscore. The maximum length is 128 characters.



76
77
78
79
80
81
82
83
# File 'lib/bigquery_migration/schema.rb', line 76

def validate_name!(name)
  unless name =~ /\A[a-zA-Z_]+\w*\Z/
    raise ConfigError, "Column name `#{name}` is invalid format"
  end
  unless name.length < 128
    raise ConfigError, "Column name `#{name}` must be less than 128"
  end
end

.validate_permitted_operations!(source_columns, target_columns) ⇒ Object

validates permitted changes from old schema to new schema



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/bigquery_migration/schema.rb', line 114

def validate_permitted_operations!(source_columns, target_columns)
  flattened_source_columns = flattened_columns(normalize_columns(source_columns))
  flattened_target_columns = flattened_columns(normalize_columns(target_columns))

  flattened_target_columns.keys.each do |flattened_name|
    next unless flattened_source_columns.key?(flattened_name)
    validate_permitted_operations_for_type!(
      flattened_source_columns[flattened_name],
      flattened_target_columns[flattened_name]
    )
    validate_permitted_operations_for_mode!(
      flattened_source_columns[flattened_name],
      flattened_target_columns[flattened_name]
    )
  end
end

.validate_permitted_operations_for_mode!(source_column, target_column) ⇒ Object

Allowed conversion rule is as follows:

(new)    => NULLABLE, REPEATED
NULLABLE => NULLABLE
REQUIRED => REQUIRED, NULLABLE
REPEATED => REPEATED

Parameters:

  • source_column (Hash)
  • target_column (Hash)


165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/bigquery_migration/schema.rb', line 165

def validate_permitted_operations_for_mode!(source_column, target_column)
  source_column = shallow_normalize_column(source_column)
  target_column = shallow_normalize_column(target_column)
  source_mode   = source_column[:mode]
  target_mode   = target_column[:mode]

  return if source_mode == target_mode
  msg = "(#{source_column.to_h} => #{target_column.to_h})"

  case source_mode
  when nil
    if target_mode == 'REQUIRED'
      raise ConfigError, "Newly adding a `REQUIRED` column is not allowed #{msg}"
    end
  when 'NULLABLE'
    raise ConfigError, "`NULLABLE` column can not be changed #{msg}"
  when 'REQUIRED'
    if target_mode == 'REPEATED'
      raise ConfigError, "`REQUIRED` column can not be changed to `REPEATED` #{msg}"
    end
  when 'REPEATED'
    raise ConfigError, "`REPEATED` column can not be changed #{msg}"
  end
end

.validate_permitted_operations_for_type!(source_column, target_column) ⇒ Object

Disallowed conversion rule is as follows:

type: RECORD => type: others
mode: REPEATED => change type

Parameters:

  • source_column (Hash)
  • target_column (Hash)


139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/bigquery_migration/schema.rb', line 139

def validate_permitted_operations_for_type!(source_column, target_column)
  source_column = shallow_normalize_column(source_column)
  target_column = shallow_normalize_column(target_column)

  msg = "(#{source_column.to_h} => #{target_column.to_h})"
  if source_column[:type] == 'RECORD'
    if target_column[:type] != 'RECORD'
      raise ConfigError, "`RECORD` can not be changed #{msg}"
    end
  end
  if source_column[:mode] and source_column[:mode] == 'REPEATED'
    if source_column[:type] != target_column[:type]
      raise ConfigError, "`REPEATED` mode column's type can not be changed #{msg}"
    end
  end
end

.validate_type!(type) ⇒ Object



85
86
87
88
89
# File 'lib/bigquery_migration/schema.rb', line 85

def validate_type!(type)
  unless ALLOWED_FIELD_TYPES.include?(type.upcase)
    raise ConfigError, "Column type `#{type}` is not allowed type"
  end
end

Instance Method Details

#build_query_fields(source_columns) ⇒ Object



69
70
71
# File 'lib/bigquery_migration/schema.rb', line 69

def build_query_fields(source_columns)
  self.class.build_query_fields(source_columns, self)
end

#diff_columns(source_columns) ⇒ Object

self - source_columns



49
50
51
# File 'lib/bigquery_migration/schema.rb', line 49

def diff_columns(source_columns)
  self.class.diff_columns(source_columns, self)
end

#diff_columns_by_name(source_columns) ⇒ Object

diff with only column names self - source_columns



55
56
57
# File 'lib/bigquery_migration/schema.rb', line 55

def diff_columns_by_name(source_columns)
  self.class.diff_columns_by_name(source_columns, self)
end

#equals?(source_columns) ⇒ Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/bigquery_migration/schema.rb', line 44

def equals?(source_columns)
  self.class.equals?(source_columns, self)
end

#find_column_by_name(name) ⇒ Object



16
17
18
# File 'lib/bigquery_migration/schema.rb', line 16

def find_column_by_name(name)
  self.class.find_column_by_name(self, name)
end

#flattened_columnsObject



40
41
42
# File 'lib/bigquery_migration/schema.rb', line 40

def flattened_columns
  self.class.flattened_columns(self)
end

#normalize_columnsObject



29
30
31
# File 'lib/bigquery_migration/schema.rb', line 29

def normalize_columns
  self.class.normalize_columns(self)
end

#reject_columns!(drop_columns) ⇒ Object



65
66
67
# File 'lib/bigquery_migration/schema.rb', line 65

def reject_columns!(drop_columns)
  self.class.reject_columns!(drop_columns, self)
end

#reverse_merge!(source_columns) ⇒ Object

A.merge!(B) => B overwrites A A.reverse_merge!(B) => A overwrites B, but A is modified



61
62
63
# File 'lib/bigquery_migration/schema.rb', line 61

def reverse_merge!(source_columns)
  self.class.reverse_merge!(source_columns, self)
end

#shallow_normalize_columnsObject



33
34
35
# File 'lib/bigquery_migration/schema.rb', line 33

def shallow_normalize_columns
  self.class.shallow_normalize_columns(self)
end

#shallow_normalize_columns!Object



36
37
38
# File 'lib/bigquery_migration/schema.rb', line 36

def shallow_normalize_columns!
  self.class.shallow_normalize_column!(self)
end

#validate_columns!Object



20
21
22
# File 'lib/bigquery_migration/schema.rb', line 20

def validate_columns!
  self.class.validate_columns!(self)
end

#validate_permitted_operations!(source_columns) ⇒ Object



24
25
26
27
# File 'lib/bigquery_migration/schema.rb', line 24

def validate_permitted_operations!(source_columns)
  target_columns = self
  self.class.validate_permitted_operations!(source_columns, target_columns)
end