Class: Webhookdb::SyncTarget::DatabaseRoutine
- Defined in:
- lib/webhookdb/sync_target.rb
Overview
-
Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.
-
Select rows created/updated since our last update in our ‘source’ database.
-
Write them to disk into a CSV file.
-
Pass this CSV file to the proper sync target adapter.
-
For example, the PG sync target will:
-
Create a temp table in the target database, using the schema from the sync target table.
-
Load the data into that temp table.
-
Insert rows into the target table temp table rows that do not appear in the target table.
-
Update rows in the target table temp table rows that already appear in the target table.
-
-
The snowflake sync target will:
-
PUT the CSV file into the stage for the table.
-
Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.
-
Purge the staged file.
-
Instance Attribute Summary
Attributes inherited from Routine
#last_synced_at, #now, #replicator, #sync_target, #timestamp_expr
Instance Method Summary collapse
-
#initialize(now, sync_target) ⇒ DatabaseRoutine
constructor
A new instance of DatabaseRoutine.
- #run ⇒ Object
Methods inherited from Routine
Constructor Details
#initialize(now, sync_target) ⇒ DatabaseRoutine
Returns a new instance of DatabaseRoutine.
413 414 415 416 417 418 |
# File 'lib/webhookdb/sync_target.rb', line 413 def initialize(now, sync_target) super @connection_url = self.sync_target.connection_url @adapter = Webhookdb::DBAdapter.adapter(@connection_url) @adapter_connection = @adapter.connection(@connection_url) end |
Instance Method Details
#run ⇒ Object
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/webhookdb/sync_target.rb', line 420 def run schema_name = @sync_target.schema.present? ? @sync_target.schema : @sync_target.class.default_schema table_name = @sync_target.table.present? ? @sync_target.table : @sync_target.service_integration.table_name adapter = @adapter schema = Webhookdb::DBAdapter::Schema.new(name: schema_name.to_sym) table = Webhookdb::DBAdapter::Table.new(name: table_name.to_sym, schema:) schema_lines = [] schema_lines << adapter.create_schema_sql(table.schema, if_not_exists: true) schema_lines << adapter.create_table_sql( table, [@replicator.primary_key_column, @replicator.remote_key_column], if_not_exists: true, ) (@replicator.denormalized_columns + [@replicator.data_column]).each do |col| schema_lines << adapter.add_column_sql(table, col, if_not_exists: true) end adapter_conn = adapter.connection(@connection_url) schema_expr = schema_lines.join(";\n") + ";" if schema_expr != self.sync_target.last_applied_schema adapter_conn.execute(schema_expr) self.sync_target.update(last_applied_schema: schema_expr) end tempfile = Tempfile.new("whdbsyncout-#{self.sync_target.id}") begin self.dataset_to_sync do |ds| ds.db.copy_table(ds, options: "DELIMITER ',', HEADER true, FORMAT csv") do |row| tempfile.write(row) end end tempfile.rewind adapter.merge_from_csv( adapter_conn, tempfile, table, @replicator.primary_key_column, [@replicator.primary_key_column, @replicator.remote_key_column,] + @replicator.denormalized_columns + [@replicator.data_column], ) self.record(self.now) ensure tempfile.unlink end end |