Class: Webhookdb::SyncTarget::DatabaseRoutine

Inherits:
Routine
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Overview

  • Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.

  • Select rows created/updated since our last update in our ‘source’ database.

  • Write them to disk into a CSV file.

  • Pass this CSV file to the proper sync target adapter.

  • For example, the PG sync target will:

    • Create a temp table in the target database, using the schema from the sync target table.

    • Load the data into that temp table.

    • Insert rows into the target table temp table rows that do not appear in the target table.

    • Update rows in the target table temp table rows that already appear in the target table.

  • The snowflake sync target will:

    • PUT the CSV file into the stage for the table.

    • Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.

    • Purge the staged file.

Instance Attribute Summary

Attributes inherited from Routine

#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr

Instance Method Summary collapse

Methods inherited from Routine

#dataset_to_sync, #record

Constructor Details

#initialize(now, sync_target) ⇒ DatabaseRoutine

Returns a new instance of DatabaseRoutine.



413
414
415
416
417
418
# File 'lib/webhookdb/sync_target.rb', line 413

def initialize(now, sync_target)
  super
  @connection_url = self.sync_target.connection_url
  @adapter = Webhookdb::DBAdapter.adapter(@connection_url)
  @adapter_connection = @adapter.connection(@connection_url)
end

Instance Method Details

#runObject



420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/webhookdb/sync_target.rb', line 420

def run
  schema_name = @sync_target.schema.present? ? @sync_target.schema : @sync_target.class.default_schema
  table_name = @sync_target.table.present? ? @sync_target.table : @sync_target.service_integration.table_name
  adapter = @adapter
  schema = Webhookdb::DBAdapter::Schema.new(name: schema_name.to_sym)
  table = Webhookdb::DBAdapter::Table.new(name: table_name.to_sym, schema:)

  schema_lines = []
  schema_lines << adapter.create_schema_sql(table.schema, if_not_exists: true)
  schema_lines << adapter.create_table_sql(
    table,
    [@replicator.primary_key_column, @replicator.remote_key_column],
    if_not_exists: true,
  )
  (@replicator.denormalized_columns + [@replicator.data_column]).each do |col|
    schema_lines << adapter.add_column_sql(table, col, if_not_exists: true)
  end
  adapter_conn = adapter.connection(@connection_url)
  schema_expr = schema_lines.join(";\n") + ";"
  if schema_expr != self.sync_target.last_applied_schema
    adapter_conn.execute(schema_expr)
    self.sync_target.update(last_applied_schema: schema_expr)
  end
  tempfile = Tempfile.new("whdbsyncout-#{self.sync_target.id}")
  begin
    self.dataset_to_sync do |ds|
      ds.db.copy_table(ds, options: "DELIMITER ',', HEADER true, FORMAT csv") do |row|
        tempfile.write(row)
      end
    end
    tempfile.rewind
    adapter.merge_from_csv(
      adapter_conn,
      tempfile,
      table,
      @replicator.primary_key_column,
      [@replicator.primary_key_column,
       @replicator.remote_key_column,] + @replicator.denormalized_columns + [@replicator.data_column],
    )
    self.record(self.now)
  ensure
    tempfile.unlink
  end
end