Class: Bricolage::PSQLTask

Inherits:
DataSourceTask show all
Includes:
VacuumLock
Defined in:
lib/bricolage/psqldatasource.rb

Overview

We don’t support dynamodb still now

Defined Under Namespace

Classes: SQLAction

Constant Summary collapse

GRANT_OPTS =
%w[privilege to]

Constants included from VacuumLock

VacuumLock::DEFAULT_VACUUM_LOCK_FILE, VacuumLock::DEFAULT_VACUUM_LOCK_TIMEOUT

Instance Attribute Summary

Attributes inherited from DataSourceTask

#ds

Instance Method Summary collapse

Methods included from VacuumLock

cleanup_vacuum_lock, create_lockfile_cmd, create_vacuum_lock_file, enable_vacuum_lock?, locking?, psql_serialize_vacuum_begin, psql_serialize_vacuum_end, serialize_vacuum, using, #using_vacuum_lock, vacuum_lock_parameters

Methods inherited from DataSourceTask

#bind, #initialize

Constructor Details

This class inherits a constructor from Bricolage::DataSourceTask

Instance Method Details

#analyze_if(enabled, target = '${dest_table}') ⇒ Object



293
294
295
# File 'lib/bricolage/psqldatasource.rb', line 293

def analyze_if(enabled, target = '${dest_table}')
  exec SQLStatement.for_string("analyze #{target};") if enabled
end

#copy_statement(src_ds, src_path, dest_table, format, jsonpath, opts) ⇒ Object



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/bricolage/psqldatasource.rb', line 327

def copy_statement(src_ds, src_path, dest_table, format, jsonpath, opts)
  unless src_ds.redshift_loader_source?
    raise ParameterError, "input data source does not support redshift as bulk loading source: #{src_ds.name}"
  end
  buf = StringIO.new
  buf.puts "copy #{dest_table}"
  buf.puts "from '#{src_ds.url(src_path)}'"
  buf.puts "credentials '#{src_ds.credential_string}'"
  buf.puts format_option(format, src_ds, jsonpath)
  opts.each do |opt|
    buf.puts opt.to_s
  end
  buf.puts ';'
  buf.string
end

#create_dummy_table(target) ⇒ Object



229
230
231
232
233
# File 'lib/bricolage/psqldatasource.rb', line 229

def create_dummy_table(target)
  exec SQLStatement.for_string(
    "create table if not exists #{target} (x int);\n"
  )
end

#drop(target_table) ⇒ Object



235
236
237
# File 'lib/bricolage/psqldatasource.rb', line 235

def drop(target_table)
  exec SQLStatement.for_string("drop table #{target_table} cascade;")
end

#drop_force(target_table) ⇒ Object



249
250
251
# File 'lib/bricolage/psqldatasource.rb', line 249

def drop_force(target_table)
  drop_obj_force('table', target_table)
end

#drop_force_if(enabled) ⇒ Object



257
258
259
# File 'lib/bricolage/psqldatasource.rb', line 257

def drop_force_if(enabled)
  drop_force('${dest_table}') if enabled
end

#drop_if(enabled) ⇒ Object



239
240
241
# File 'lib/bricolage/psqldatasource.rb', line 239

def drop_if(enabled)
  drop '${dest_table}' if enabled
end

#drop_obj_force(type, name) ⇒ Object



243
244
245
246
247
# File 'lib/bricolage/psqldatasource.rb', line 243

def drop_obj_force(type, name)
  exec SQLStatement.for_string(
    "drop #{type} if exists #{name} cascade;\n"
  )
end

#drop_view_force(target_view) ⇒ Object



253
254
255
# File 'lib/bricolage/psqldatasource.rb', line 253

def drop_view_force(target_view)
  drop_obj_force('view', target_view)
end

#drop_view_force_if(enabled) ⇒ Object



261
262
263
# File 'lib/bricolage/psqldatasource.rb', line 261

def drop_view_force_if(enabled)
  drop_view_force('${dest_table}') if enabled
end

#each_statementObject



169
170
171
172
173
# File 'lib/bricolage/psqldatasource.rb', line 169

def each_statement
  each_action do |action|
    yield action.statement
  end
end

#exec(stmt) ⇒ Object



165
166
167
# File 'lib/bricolage/psqldatasource.rb', line 165

def exec(stmt)
  add SQLAction.new(stmt)
end

#explain_sourceObject



208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/bricolage/psqldatasource.rb', line 208

def explain_source
  buf = StringIO.new
  each_statement do |stmt|
    buf.puts
    buf.puts "-- #{stmt.location}" if stmt.location
    if support_explain?(stmt.kind)
      buf.puts "explain #{stmt.stripped_source}"
    else
      buf.puts "/* #{stmt.stripped_source} */"
    end
  end
  buf.string
end

#format_option(fmt, src_ds, jsonpath) ⇒ Object



343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/bricolage/psqldatasource.rb', line 343

def format_option(fmt, src_ds, jsonpath)
  case fmt
  when 'tsv'
    %q(delimiter '\t')
  when 'csv'
    %q(delimiter ',')
  when 'json'
    "json '#{json_param(jsonpath)}'"
  else
    raise ParameterError, "unsupported format: #{fmt}"
  end
end

#format_query(query) ⇒ Object



396
397
398
# File 'lib/bricolage/psqldatasource.rb', line 396

def format_query(query)
  query.gsub(/^--.*/, '').strip.gsub(/[ \t]*\n[ \t]*/, ' ').gsub("'", "\\\\'")
end

#grant(privilege:, on:, to:) ⇒ Object



297
298
299
# File 'lib/bricolage/psqldatasource.rb', line 297

def grant(privilege:, on:, to:)
  exec SQLStatement.for_string("grant #{privilege} on #{on} to #{to};")
end

#grant_if(opts, target) ⇒ Object

Raises:



303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/bricolage/psqldatasource.rb', line 303

def grant_if(opts, target)
  return unless opts
  return if opts.empty?
  unknown_keys = opts.keys - GRANT_OPTS
  raise ParameterError, "unknown grant options: #{unknown_keys.inspect}" unless unknown_keys.empty?
  missing_keys = GRANT_OPTS - opts.keys
  raise ParameterError, %Q(missing grant options: #{missing_keys.inspect}) unless missing_keys.empty?
  args = {on: target}
  opts.each do |k, v|
    args[k.intern] = v
  end
  grant(**args)
end

#json_param(jsonpath) ⇒ Object



356
357
358
359
360
361
362
363
364
365
# File 'lib/bricolage/psqldatasource.rb', line 356

def json_param(jsonpath)
  case jsonpath
  when nil
    'auto'
  when %r{\As3://}
    jsonpath
  else
    src_ds.url(jsonpath)
  end
end

#load(src_ds, src_path, dest_table, format, jsonpath, opts) ⇒ Object



323
324
325
# File 'lib/bricolage/psqldatasource.rb', line 323

def load(src_ds, src_path, dest_table, format, jsonpath, opts)
  exec SQLStatement.for_string(copy_statement(src_ds, src_path, dest_table, format, jsonpath, opts))
end

#rename_table(src, dest) ⇒ Object



269
270
271
# File 'lib/bricolage/psqldatasource.rb', line 269

def rename_table(src, dest)
  exec SQLStatement.for_string("alter table #{src} rename to #{dest};")
end

#runObject

override



198
199
200
201
202
# File 'lib/bricolage/psqldatasource.rb', line 198

def run
  VacuumLock.using {
    @ds.execute source
  }
end

#run_explainObject



204
205
206
# File 'lib/bricolage/psqldatasource.rb', line 204

def run_explain
  @ds.execute explain_source
end

#serialize_vacuumObject

override



287
288
289
290
291
# File 'lib/bricolage/psqldatasource.rb', line 287

def serialize_vacuum   # override
  exec SQLStatement.for_string psql_serialize_vacuum_begin
  yield
  exec SQLStatement.for_string psql_serialize_vacuum_end
end

#sourceObject

override



186
187
188
189
190
191
192
193
194
195
# File 'lib/bricolage/psqldatasource.rb', line 186

def source
  buf = StringIO.new
  buf.puts '\timing on'
  each_statement do |stmt|
    buf.puts
    buf.puts "/* #{stmt.location} */" if stmt.location
    buf.puts stmt.stripped_source
  end
  buf.string
end

#support_explain?(statement_kind) ⇒ Boolean

Returns:

  • (Boolean)


222
223
224
225
226
227
# File 'lib/bricolage/psqldatasource.rb', line 222

def support_explain?(statement_kind)
  case statement_kind
  when 'select', 'insert', 'update', 'delete' then true
  else false
  end
end

#transactionObject



317
318
319
320
321
# File 'lib/bricolage/psqldatasource.rb', line 317

def transaction
  exec SQLStatement.for_string('begin transaction;')
  yield
  exec SQLStatement.for_string('commit;')
end

#truncate_if(enabled, target = '${dest_table}') ⇒ Object



265
266
267
# File 'lib/bricolage/psqldatasource.rb', line 265

def truncate_if(enabled, target = '${dest_table}')
  exec SQLStatement.for_string("truncate #{target};") if enabled
end

#unload(stmt, dest_ds, dest_path, format, opts) ⇒ Object



367
368
369
# File 'lib/bricolage/psqldatasource.rb', line 367

def unload(stmt, dest_ds, dest_path, format, opts)
  exec unload_statement(stmt, dest_ds, dest_path, format, opts)
end

#unload_format_option(format, ds) ⇒ Object



385
386
387
388
389
390
391
392
393
394
# File 'lib/bricolage/psqldatasource.rb', line 385

def unload_format_option(format, ds)
  case format
  when 'tsv'
    %q(delimiter '\t')
  when 'csv'
    %q(delimiter ',')
  else
    raise ParameterError, "unsupported format: #{fmt}"
  end
end

#unload_statement(stmt, dest_ds, dest_path, format, opts) ⇒ Object



371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/bricolage/psqldatasource.rb', line 371

def unload_statement(stmt, dest_ds, dest_path, format, opts)
  buf = StringIO.new
  buf.puts "unload ('#{format_query(stmt.stripped_raw_content)}')"
  buf.puts "to '#{dest_ds.url(dest_path)}'"
  buf.puts "credentials '#{dest_ds.credential_string}'"
  buf.puts unload_format_option(format, dest_ds)
  opts.each do |opt|
    buf.puts opt.to_s
  end
  buf.puts ';'
  res = StringResource.new(buf.string, stmt.location)
  SQLStatement.new(res, stmt.declarations)
end

#vacuum_if(enable_vacuum, enable_vacuum_sort, target = '${dest_table}') ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
# File 'lib/bricolage/psqldatasource.rb', line 273

def vacuum_if(enable_vacuum, enable_vacuum_sort, target = '${dest_table}')
  if enable_vacuum
    serialize_vacuum {
      exec SQLStatement.for_string("vacuum #{target};")
    }
  elsif enable_vacuum_sort
    serialize_vacuum {
      exec SQLStatement.for_string("vacuum sort only #{target};")
    }
  end
end