Module: Webhookdb::Replicator::PartitionableMixin
- Included in:
- FakeHashPartition, FakeRangePartition, FakeStaleRowPartitioned, IcalendarEventV1Partitioned
- Defined in:
- lib/webhookdb/replicator/partitionable_mixin.rb
Overview
Mixin for replicators that support partitioning. Partitioning is currently in beta, with the following limitations/context:
-
They cannot be created from the CLI. Because the partitions must be created during the CREATE TABLE call, the partition_value must be set immediately on creation, or CREATE TABLE must be deferred.
-
CLI support would also require making sure this field isn’t edited. This is an annoying change, so we’re putting it off for now.
-
Instead, partitioned replicators must be created in the console.
-
The number of HASH partitions cannot be changed; there is no good way to handle this in Postgres so we don’t bother here.
-
RANGE partitions are not supported. We need to support creating the partition when the INSERT fails. But creating the partitioned table definition itself does work/has a shared behavior at least.
-
Existing replicators cannot be converted to partitioned. This is theoretically possible, but it seems easier to just start over with a new replicator.
-
Instead:
-
If this is a ‘child’ replicator, then create a new parent and this child, then copy over the parent data, either directly (for icalendar) or using HTTP requests (like with Plaid or Google) where more logic is required.
-
Otherwise, it’ll depend on the replicator.
-
Then to switch clients using the old replicator, to the new replicator, you can:
-
Then turn off all workers.
-
Rename the new table to the old, and old table to the new.
-
Update the service integrations, so the old one points to the new table name and opaque id, and the new one points to the old table name and opaque id.
-
-
Constant Summary collapse
- MAX_16BIT_INT =
2**31
Instance Method Summary collapse
- #_prepare_for_insert(resource, event, request, enrichment) ⇒ Object
-
#_str2inthash(s) ⇒ Object
Convert the given string into a stable MD5-derived hash that can be stored in a (signed, 4 bit) INTEGER column.
- #_upsert_conflict_target ⇒ Object
-
#existing_partitions(db) ⇒ Array<Webhookdb::DBAdapter::Partition>
Return the partitions belonging to the table.
- #partition? ⇒ Boolean
- #partition_align_name ⇒ Object
-
#partition_column_name ⇒ Symbol
The partition column name.
-
#partition_method ⇒ Object
The partition method, like Webhookdb::DBAdapter::Partitioning::HASH.
- #partition_suffix(partname) ⇒ Object
-
#partition_value(_resource) ⇒ Object
The value for the denormalized column.
- #partitioning ⇒ Object
Instance Method Details
#_prepare_for_insert(resource, event, request, enrichment) ⇒ Object
51 52 53 54 55 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 51 def _prepare_for_insert(resource, event, request, enrichment) h = super h[self.partition_column_name] = self.partition_value(resource) return h end |
#_str2inthash(s) ⇒ Object
Convert the given string into a stable MD5-derived hash that can be stored in a (signed, 4 bit) INTEGER column.
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 63 def _str2inthash(s) # MD5 is 128 bits/16 bytes/32 hex chars (2 chars per byte). # Integers are 32 bits/4 bytes/8 hex chars. # Grab the first 8 chars and convert it to an integer. unsigned_md5int = Digest::MD5.hexdigest(s)[..8].to_i(16) # Then AND it with a 32 bit bitmask to make sure it fits in 32 bits # (though I'm not entirely sure why the above doesn't result in 32 bits always). unsigned_int32 = unsigned_md5int & 0xFFFFFFFF # Convert it from unsigned (0 to 4.2B) to signed (-2.1B to 2.1B) by subtracting 2.1B # (the max 2 byte integer), as opposed to a 4 byte integer which we're dealing with here. signed_md5int = unsigned_int32 - MAX_16BIT_INT return signed_md5int end |
#_upsert_conflict_target ⇒ Object
57 58 59 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 57 def _upsert_conflict_target return [self.partition_column_name, self._remote_key_column.name] end |
#existing_partitions(db) ⇒ Array<Webhookdb::DBAdapter::Partition>
Return the partitions belonging to the table.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 82 def existing_partitions(db) # SELECT inhrelid::regclass AS child # FROM pg_catalog.pg_inherits # WHERE inhparent = 'my_schema.foo'::regclass; parent = self.schema_and_table_symbols.map(&:to_s).join(".") partnames = db[Sequel[:pg_catalog][:pg_inherits]]. where(inhparent: Sequel[parent].cast(:regclass)). select_map(Sequel[:inhrelid].cast(:regclass)) parent_table = self.dbadapter_table result = partnames.map do |part| suffix = self.partition_suffix(part) Webhookdb::DBAdapter::Partition.new(parent_table:, partition_name: part.to_sym, suffix:) end return result end |
#partition? ⇒ Boolean
45 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 45 def partition? = true |
#partition_align_name ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 102 def partition_align_name tblname = self.service_integration.table_name self.service_integration.organization.admin_connection do |db| partitions = self.existing_partitions(db) db.transaction do partitions.each do |partition| next if partition.partition_name.to_s.start_with?(tblname) schema = partition.parent_table.schema.name new_partname = "#{tblname}#{partition.suffix}" db << "ALTER TABLE #{schema}.#{partition.partition_name} RENAME TO #{new_partname}" end end end end |
#partition_column_name ⇒ Symbol
The partition column name. Must be present in _denormalized_columns
.
39 40 41 42 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 39 def partition_column_name = raise NotImplementedError # The value for the denormalized column. For HASH partitioning this would be an integer, # for RANGE partitioning this could be a timestamp, etc. # Takes the resource and returns the value. |
#partition_method ⇒ Object
The partition method, like Webhookdb::DBAdapter::Partitioning::HASH
35 36 37 38 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 35 def partition_method = raise NotImplementedError # The partition column name. # Must be present in +_denormalized_columns+. # @return [Symbol] |
#partition_suffix(partname) ⇒ Object
98 99 100 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 98 def partition_suffix(partname) return partname[/_[a-zA-Z\d]+$/].to_sym end |
#partition_value(_resource) ⇒ Object
The value for the denormalized column. For HASH partitioning this would be an integer, for RANGE partitioning this could be a timestamp, etc. Takes the resource and returns the value.
43 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 43 def partition_value(_resource) = raise NotImplementedError |
#partitioning ⇒ Object
47 48 49 |
# File 'lib/webhookdb/replicator/partitionable_mixin.rb', line 47 def partitioning return Webhookdb::DBAdapter::Partitioning.new(by: self.partition_method, column: self.partition_column_name) end |