Method: Webhookdb::Replicator::Base#ensure_all_columns_modification

Defined in:
lib/webhookdb/replicator/base.rb

#ensure_all_columns_modificationWebhookdb::Replicator::SchemaModification



521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
# File 'lib/webhookdb/replicator/base.rb', line 521

def ensure_all_columns_modification
  existing_cols, existing_indices = nil
  max_pk = 0
  sint = self.service_integration
  self.admin_dataset do |ds|
    return self.create_table_modification unless ds.db.table_exists?(self.qualified_table_sequel_identifier)
    existing_cols = ds.columns.to_set
    existing_indices = ds.db[:pg_indexes].where(
      schemaname: sint.organization.replication_schema,
      tablename: sint.table_name,
    ).select_map(:indexname).to_set
    max_pk = ds.max(:pk) || 0
  end
  adapter = Webhookdb::DBAdapter::PG.new
  table = self.dbadapter_table
  result = Webhookdb::Replicator::SchemaModification.new

  missing_columns = self._denormalized_columns.delete_if { |c| existing_cols.include?(c.name) }
  # Add missing columns
  missing_columns.each do |whcol|
    # Don't bother bulking the ADDs into a single ALTER TABLE, it won't really matter.
    result.transaction_statements << adapter.add_column_sql(table, whcol.to_dbadapter)
  end
  # Easier to handle this explicitly than use storage_columns, but it a duplicated concept so be careful.
  if (enrich_col = self.enrichment_column) && !existing_cols.include?(enrich_col.name)
    result.transaction_statements << adapter.add_column_sql(table, enrich_col)
  end

  # Backfill values for new columns.
  if missing_columns.any?
    # We need to backfill values into the new column, but we don't want to lock the entire table
    # as we update each row. So we need to update in chunks of rows.
    # Chunk size should be large for speed (and sending over fewer queries), but small enough
    # to induce a viable delay if another query is updating the same row.
    # Note that the delay will only be for writes to those rows; reads will not block,
    # so something a bit longer should be ok.
    #
    # Note that at the point these UPDATEs are running, we have the new column AND the new code inserting
    # into that new column. We could in theory skip all the PKs that were added after this modification
    # started to run. However considering the number of rows in this window will always be relatively low
    # (though not absolutely low), and the SQL backfill operation should yield the same result
    # as the Ruby operation, this doesn't seem too important.
    result.nontransaction_statements.concat(missing_columns.filter_map(&:backfill_statement))
    update_expr = missing_columns.to_h { |c| [c.name, c.backfill_expr || c.to_sql_expr] }
    self.admin_dataset do |ds|
      chunks = Webhookdb::Replicator::Base.chunked_row_update_bounds(max_pk)
      chunks[...-1].each do |(lower, upper)|
        update_query = ds.where { pk > lower }.where { pk <= upper }.update_sql(update_expr)
        result.nontransaction_statements << update_query
      end
      final_update_query = ds.where { pk > chunks[-1][0] }.update_sql(update_expr)
      result.nontransaction_statements << final_update_query
    end
  end

  # Add missing indices. This should happen AFTER the UPDATE calls so the UPDATEs don't have to update indices.
  self.indices(table).map do |index|
    next if existing_indices.include?(index.name.to_s)
    result.nontransaction_statements << adapter.create_index_sql(index, concurrently: true)
  end

  result.application_database_statements << sint.ensure_sequence_sql if self.requires_sequence?
  return result
end