Class: Webhookdb::SyncTarget::DatabaseRoutine

Inherits:
Routine
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Overview

  • Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.

  • Select rows created/updated since our last update in our ‘source’ database.

  • Write them to disk into a CSV file.

  • Pass this CSV file to the proper sync target adapter.

  • For example, the PG sync target will:

    • Create a temp table in the target database, using the schema from the sync target table.

    • Load the data into that temp table.

    • Insert rows into the target table temp table rows that do not appear in the target table.

    • Update rows in the target table temp table rows that already appear in the target table.

  • The snowflake sync target will:

    • PUT the CSV file into the stage for the table.

    • Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.

    • Purge the staged file.

Instance Attribute Summary

Attributes inherited from Routine

#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr

Instance Method Summary collapse

Methods inherited from Routine

#dataset_to_sync, #perform_db_op, #record, #to_ms, #with_stat

Constructor Details

#initialize(now, sync_target) ⇒ DatabaseRoutine

Returns a new instance of DatabaseRoutine.



586
587
588
589
590
591
# File 'lib/webhookdb/sync_target.rb', line 586

def initialize(now, sync_target)
  super
  @connection_url = self.sync_target.connection_url
  @adapter = Webhookdb::DBAdapter.adapter(@connection_url)
  @adapter_connection = @adapter.connection(@connection_url)
end

Instance Method Details

#runObject



593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
# File 'lib/webhookdb/sync_target.rb', line 593

def run
  schema_name = @sync_target.schema.present? ? @sync_target.schema : @sync_target.class.default_schema
  table_name = @sync_target.table.present? ? @sync_target.table : @sync_target.service_integration.table_name
  adapter = @adapter
  schema = Webhookdb::DBAdapter::Schema.new(name: schema_name.to_sym)
  table = Webhookdb::DBAdapter::Table.new(name: table_name.to_sym, schema:)

  schema_lines = []
  schema_lines << adapter.create_schema_sql(table.schema, if_not_exists: true)
  schema_lines << adapter.create_table_sql(
    table,
    [@replicator.primary_key_column, @replicator.remote_key_column],
    if_not_exists: true,
  )
  (@replicator.denormalized_columns + [@replicator.data_column]).each do |col|
    schema_lines << adapter.add_column_sql(table, col, if_not_exists: true)
  end
  adapter_conn = adapter.connection(@connection_url)
  schema_expr = schema_lines.join(";\n") + ";"
  if schema_expr != self.sync_target.last_applied_schema
    adapter_conn.execute(schema_expr)
    self.perform_db_op do
      self.sync_target.update(last_applied_schema: schema_expr)
    end
  end
  tempfile = Tempfile.new("whdbsyncout-#{self.sync_target.id}")
  begin
    self.dataset_to_sync do |ds|
      ds.db.copy_table(ds, options: "DELIMITER ',', HEADER true, FORMAT csv") do |row|
        tempfile.write(row)
      end
    end
    tempfile.rewind
    adapter.merge_from_csv(
      adapter_conn,
      tempfile,
      table,
      @replicator.primary_key_column,
      [@replicator.primary_key_column,
       @replicator.remote_key_column,] + @replicator.denormalized_columns + [@replicator.data_column],
    )
    self.record(self.now)
  ensure
    tempfile.unlink
  end
end