Class: Embulk::Input::InputBigquery

Inherits:
InputPlugin
  • Object
show all
Defined in:
lib/embulk/input/big-query-async.rb

Defined Under Namespace

Classes: LocalFile

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.transaction(config) {|task, columns, 1| ... } ⇒ Object

Yields:

  • (task, columns, 1)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/embulk/input/big-query-async.rb', line 22

def self.transaction(config, &control)
	sql = config[:sql]
	params = {}
	unless sql
		sql_erb = config[:sql_erb]
		erb = ERB.new(sql_erb)
		erb_params = config[:erb_params]
		erb_params.each do |k, v|
			params[k] = eval(v)
		end

		sql = erb.result(binding)
	end

	task = {
		project: config[:project],
		keyfile: config.param(:keyfile, LocalFile, nil),
		sql: sql,
		columns: config[:columns],
		params: params,
		synchronous_method: config[:synchronous_method],
		asynchronous_method: config[:asynchronous_method],
		dataset: config[:dataset],
		table: config[:table],
		option: {
			cache: config[:cache],
			standard_sql: config[:standard_sql],
			legacy_sql: config[:legacy_sql],
		}
	}

	if task[:synchronous_method] || !task[:asynchronous_method]
		task[:option].merge!(
			{
				max: config[:max],
				timeout: config[:timeout],
				dryrun:  config[:dryrun],
			}
		)
	else
		task[:option].merge!(
			{
				large_results: config[:legacy_sql],
				write: config[:write],
			}
		)
	end

	columns = []
	config[:columns].each_with_index do |c, i|
		columns << Column.new(i, c['name'], c['type'].to_sym)
	end

	yield(task, columns, 1)

	return {}
end

Instance Method Details

#extract_record(row) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/embulk/input/big-query-async.rb', line 123

def extract_record(row)
	columns = []
	@task[:columns].each do |c|
		val = row[c['name']]
		if c['eval']
			val = eval(c['eval'], binding)
		end
		columns << val
	end
	return columns
end

#keys_to_sym(hash) ⇒ Object



142
143
144
145
146
147
148
# File 'lib/embulk/input/big-query-async.rb', line 142

def keys_to_sym(hash)
	ret = {}
	hash.each do |key, value|
		ret[key.to_sym] = value
	end
	ret
end

#runObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/embulk/input/big-query-async.rb', line 80

def run
	bq = Google::Cloud::Bigquery.new(project: @task[:project], keyfile: @task[:keyfile])
	params = @task[:params]
	@task[:columns] = values_to_sym(@task[:columns], 'name')
	option = keys_to_sym(@task[:option])
	if @task[:synchronous_method] || @task[:asynchronous_method].nil?
		run_synchronous_query(bq, option)
	else
		if @task[:dataset]
			dataset = bq.dataset(@task[:dataset])
			option[:table] = dataset.table(@task[:table])
			if option[:table].nil?
				option[:table] = dataset.create_table(@task[:table])
			end
		end
		run_asynchronous_query(bq, option)
	end
	@page_builder.finish
	return {}
end

#run_asynchronous_query(bq, option) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/embulk/input/big-query-async.rb', line 109

def run_asynchronous_query(bq, option)
	job = bq.query_job(@task[:sql], **option)
	job.wait_until_done!
	return {} if job.failed?
	results = job.query_results
	while results
		results.each do |row|
			record = extract_record(row)
			@page_builder.add(record)
		end
		results = results.next
	end
end

#run_synchronous_query(bq, option) ⇒ Object



101
102
103
104
105
106
107
# File 'lib/embulk/input/big-query-async.rb', line 101

def run_synchronous_query(bq, option)
	rows = bq.query(@task[:sql], **option)
	rows.each do |row|
		record = extract_record(row)
		@page_builder.add(record)
	end
end

#values_to_sym(hashs, key) ⇒ Object



135
136
137
138
139
140
# File 'lib/embulk/input/big-query-async.rb', line 135

def values_to_sym(hashs, key)
	hashs.map do |h|
		h[key] = h[key].to_sym
		h
	end
end