Class: Embulk::Input::InputBigquery

Inherits:
InputPlugin
  • Object
show all
Defined in:
lib/embulk/input/big-query-async.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.transaction(config) {|task, columns, 1| ... } ⇒ Object

Yields:

  • (task, columns, 1)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/embulk/input/big-query-async.rb', line 10

def self.transaction(config, &control)
  sql = config[:sql]
  params = {}
  unless sql
    sql_erb = config[:sql_erb]
    erb = ERB.new(sql_erb)
    erb_params = config[:erb_params]
    erb_params.each do |k, v|
      params[k] = eval(v)
    end

    sql = erb.result(binding)
  end

  task = {
    project: config[:project],
    keyfile: config[:keyfile],
    sql: sql,
    columns: config[:columns],
    params: params,
    synchronous_method: config[:synchronous_method],
    asynchronous_method: config[:asynchronous_method],
    dataset: config[:dataset],
    table: config[:table],
    option: {
      cache: config[:cache],
      standard_sql: config[:standard_sql],
      legacy_sql: config[:legacy_sql],
    }
  }

  if task[:synchronous_method] || !task[:asynchronous_method]
    task[:option].merge!(
      {
        max: config[:max],
        timeout: config[:timeout],
        dryrun:  config[:dryrun],
      }
    )
  else
    task[:option].merge!(
      {
        large_results: config[:legacy_sql],
        write: config[:write],
      }
    )
  end

  columns = []
  config[:columns].each_with_index do |c, i|
    columns << Column.new(i, c['name'], c['type'].to_sym)
  end

  yield(task, columns, 1)

  return {}
end

Instance Method Details

#extract_record(row) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/embulk/input/big-query-async.rb', line 111

def extract_record(row)
  columns = []
  @task[:columns].each do |c|
    val = row[c['name']]
    if c['eval']
      val = eval(c['eval'], binding)
    end
    columns << val
  end
  return columns
end

#keys_to_sym(hash) ⇒ Object



130
131
132
133
134
135
136
# File 'lib/embulk/input/big-query-async.rb', line 130

def keys_to_sym(hash)
  ret = {}
  hash.each do |key, value|
    ret[key.to_sym] = value
  end
  ret
end

#runObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/embulk/input/big-query-async.rb', line 68

def run
  bq = Google::Cloud::Bigquery.new(project: @task[:project], keyfile: @task[:keyfile])
  params = @task[:params]
  @task[:columns] = values_to_sym(@task[:columns], 'name')
  option = keys_to_sym(@task[:option])
  if @task[:synchronous_method] || @task[:asynchronous_method].nil?
    run_synchronous_query(bq, option)
  else
    if @task[:dataset]
      dataset = bq.dataset(@task[:dataset])
      option[:table] = dataset.table(@task[:table])
      if option[:table].nil?
        option[:table] = dataset.create_table(@task[:table])
      end
    end
    run_asynchronous_query(bq, option)
  end
  @page_builder.finish
  return {}
end

#run_asynchronous_query(bq, option) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/embulk/input/big-query-async.rb', line 97

def run_asynchronous_query(bq, option)
  job = bq.query_job(@task[:sql], **option)
  job.wait_until_done!
  return {} if job.failed?
  results = job.query_results
  while results
    results.each do |row|
      record = extract_record(row)
      @page_builder.add(record)
    end
    results = results.next
  end
end

#run_synchronous_query(bq, option) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/embulk/input/big-query-async.rb', line 89

def run_synchronous_query(bq, option)
  rows = bq.query(@task[:sql], **option)
  rows.each do |row|
    record = extract_record(row)
    @page_builder.add(record)
  end
end

#values_to_sym(hashs, key) ⇒ Object



123
124
125
126
127
128
# File 'lib/embulk/input/big-query-async.rb', line 123

def values_to_sym(hashs, key)
  hashs.map do |h|
    h[key] = h[key].to_sym
    h
  end
end