Class: Embulk::Output::Bigquery::Helper

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/bigquery/helper.rb

Class Method Summary collapse

Class Method Details

.bq_type_from_embulk_type(embulk_type) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/embulk/output/bigquery/helper.rb', line 8

def self.bq_type_from_embulk_type(embulk_type)
  case embulk_type
  when :boolean then 'BOOLEAN'
  when :long then 'INTEGER'
  when :double then 'FLOAT'
  when :string then 'STRING'
  when :timestamp then 'TIMESTAMP'
  when :json then 'STRING' # NOTE: Default is not RECORD since it requires `fields`
  else raise ArgumentError, "embulk type #{embulk_type} is not supported"
  end
end

.column_options_map(column_options) ⇒ Hash

ToDo: recursively map fields?

Returns:

  • (Hash)

    name => column_option.



22
23
24
25
26
# File 'lib/embulk/output/bigquery/helper.rb', line 22

def self.column_options_map(column_options)
  (column_options || {}).map do |column_option|
    [column_option['name'], column_option]
  end.to_h
end

.create_load_job_id(task, path, fields) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/embulk/output/bigquery/helper.rb', line 56

def self.create_load_job_id(task, path, fields)
  elements = [
    Digest::MD5.file(path).hexdigest,
    task['dataset'],
    task['table'],
    fields,
    task['source_format'],
    task['max_bad_records'],
    task['field_delimiter'],
    task['encoding'],
    task['ignore_unknown_values'],
    task['allow_quoted_newlines'],
  ]

  str = elements.map(&:to_s).join('')
  md5 = Digest::MD5.hexdigest(str)
  "embulk_load_job_#{md5}"
end

.deep_symbolize_keys(obj) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/embulk/output/bigquery/helper.rb', line 43

def self.deep_symbolize_keys(obj)
  if obj.is_a?(Hash)
    obj.inject({}) do |options, (key, value)|
      options[(key.to_sym rescue key) || key] = deep_symbolize_keys(value)
      options
    end
  elsif obj.is_a?(Array)
    obj.map {|value| deep_symbolize_keys(value) }
  else
    obj
  end
end

.fields_from_embulk_schema(task, schema) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/embulk/output/bigquery/helper.rb', line 28

def self.fields_from_embulk_schema(task, schema)
  column_options_map = self.column_options_map(task['column_options'])
  schema.map do |column|
    column_name   = column[:name]
    embulk_type   = column[:type]
    column_option = column_options_map[column_name] || {}
    {}.tap do |field|
      field[:name]   = column_name
      field[:type]   = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase
      field[:mode]   = column_option['mode'] if column_option['mode']
      field[:fields] = deep_symbolize_keys(column_option['fields']) if column_option['fields']
    end
  end
end