Class: Webhookdb::SyncTarget::DatabaseRoutine
- Defined in:
- lib/webhookdb/sync_target.rb
Overview
-
Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.
-
Select rows created/updated since our last update in our ‘source’ database.
-
Write them to disk into a CSV file.
-
Pass this CSV file to the proper sync target adapter.
-
For example, the PG sync target will:
-
Create a temp table in the target database, using the schema from the sync target table.
-
Load the data into that temp table.
-
Insert rows into the target table temp table rows that do not appear in the target table.
-
Update rows in the target table temp table rows that already appear in the target table.
-
-
The snowflake sync target will:
-
PUT the CSV file into the stage for the table.
-
Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.
-
Purge the staged file.
-
Instance Attribute Summary
Attributes inherited from Routine
#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr
Instance Method Summary collapse
-
#initialize(now, sync_target) ⇒ DatabaseRoutine
constructor
A new instance of DatabaseRoutine.
- #run ⇒ Object
Methods inherited from Routine
#dataset_to_sync, #perform_db_op, #record, #to_ms, #with_stat
Constructor Details
#initialize(now, sync_target) ⇒ DatabaseRoutine
Returns a new instance of DatabaseRoutine.
586 587 588 589 590 591 |
# File 'lib/webhookdb/sync_target.rb', line 586 def initialize(now, sync_target) super @connection_url = self.sync_target.connection_url @adapter = Webhookdb::DBAdapter.adapter(@connection_url) @adapter_connection = @adapter.connection(@connection_url) end |
Instance Method Details
#run ⇒ Object
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 |
# File 'lib/webhookdb/sync_target.rb', line 593 def run schema_name = @sync_target.schema.present? ? @sync_target.schema : @sync_target.class.default_schema table_name = @sync_target.table.present? ? @sync_target.table : @sync_target.service_integration.table_name adapter = @adapter schema = Webhookdb::DBAdapter::Schema.new(name: schema_name.to_sym) table = Webhookdb::DBAdapter::Table.new(name: table_name.to_sym, schema:) schema_lines = [] schema_lines << adapter.create_schema_sql(table.schema, if_not_exists: true) schema_lines << adapter.create_table_sql( table, [@replicator.primary_key_column, @replicator.remote_key_column], if_not_exists: true, ) (@replicator.denormalized_columns + [@replicator.data_column]).each do |col| schema_lines << adapter.add_column_sql(table, col, if_not_exists: true) end adapter_conn = adapter.connection(@connection_url) schema_expr = schema_lines.join(";\n") + ";" if schema_expr != self.sync_target.last_applied_schema adapter_conn.execute(schema_expr) self.perform_db_op do self.sync_target.update(last_applied_schema: schema_expr) end end tempfile = Tempfile.new("whdbsyncout-#{self.sync_target.id}") begin self.dataset_to_sync do |ds| ds.db.copy_table(ds, options: "DELIMITER ',', HEADER true, FORMAT csv") do |row| tempfile.write(row) end end tempfile.rewind adapter.merge_from_csv( adapter_conn, tempfile, table, @replicator.primary_key_column, [@replicator.primary_key_column, @replicator.remote_key_column,] + @replicator.denormalized_columns + [@replicator.data_column], ) self.record(self.now) ensure tempfile.unlink end end |