Class: Webhookdb::Replicator::BaseStaleRowDeleter
- Inherits:
-
Object
- Object
- Webhookdb::Replicator::BaseStaleRowDeleter
- Defined in:
- lib/webhookdb/replicator/base_stale_row_deleter.rb
Overview
Delete stale rows (like cancelled calendar events) not updated (row_updated_at or whatever column) in the window between stale_at
back to lookback_window
. This avoids endlessly adding to a table where we expect rows to become stale over time.
Direct Known Subclasses
FakeStaleRow::StaleRowDeleter, IcalendarEventV1::StaleRowDeleter
Instance Attribute Summary collapse
Instance Method Summary collapse
- #_run_delete(ds, stale_window_early:, stale_window_late:) ⇒ Object
-
#chunk_size ⇒ Integer
The row delete is done in chunks to avoid long locks.
-
#incremental_lookback_size ⇒ Object
How small should the incremental lookback window be? See
run
for details. -
#initialize(replicator) ⇒ BaseStaleRowDeleter
constructor
A new instance of BaseStaleRowDeleter.
-
#lookback_window ⇒ ActiveSupport::Duration
How far from
stale_at
to “look back” for stale rows. -
#run(lookback_window: self.lookback_window) ⇒ Object
Run the deleter.
-
#run_initial ⇒ Object
Run with
lookback_window
asnil
, which does a full table scan. - #set_autovacuum(db, on) ⇒ Object
-
#stale_at ⇒ ActiveSupport::Duration
When a row is considered ‘stale’.
-
#stale_condition ⇒ Hash
Other additional ‘stale’ conditions, like ‘cancelled’.
-
#updated_at_column ⇒ Symbol
Name of the column, like
:row_updated_at
.
Constructor Details
#initialize(replicator) ⇒ BaseStaleRowDeleter
Returns a new instance of BaseStaleRowDeleter.
10 11 12 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 10 def initialize(replicator) @replicator = replicator end |
Instance Attribute Details
#replicator ⇒ Webhookdb::Replicator::Base (readonly)
8 9 10 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 8 def replicator @replicator end |
Instance Method Details
#_run_delete(ds, stale_window_early:, stale_window_late:) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 124 def _run_delete(ds, stale_window_early:, stale_window_late:) base_ds = ds.where(self.stale_condition).limit(self.chunk_size).select(:pk) window_start = stale_window_early until window_start >= stale_window_late window_end = window_start + self.incremental_lookback_size inner_ds = base_ds.where(self.updated_at_column => window_start..window_end) loop do # Due to conflicts where a feed is being inserted while the delete is happening, # this may raise an error like: # deadlock detected # DETAIL: Process 18352 waits for ShareLock on transaction 435085606; blocked by process 24191. # Process 24191 waits for ShareLock on transaction 435085589; blocked by process 18352. # HINT: See server log for query details. # CONTEXT: while deleting tuple (2119119,3) in relation "icalendar_event_v1_aaaa" # So we don't explicitly handle deadlocks, but could if it becomes an issue. delete_ds = ds.where(pk: inner_ds) # Disable seqscan for the delete. We can end up with seqscans if the planner decides # it's a better choice given the 'updated at' index, but for our purposes we know # we never want to use it (the impact is negligible on small tables, # and catastrophic on large tables). sql_lines = [ "BEGIN", "SET LOCAL enable_seqscan='off'", delete_ds.delete_sql, "COMMIT", ] deleted = ds.db << sql_lines.join(";\n") break if deleted != self.chunk_size end window_start = window_end end end |
#chunk_size ⇒ Integer
The row delete is done in chunks to avoid long locks. The default seems safe, but it’s exposed if you need to play around with it, and can be done via configuration if needed at some point.
54 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 54 def chunk_size = 10_000 |
#incremental_lookback_size ⇒ Object
How small should the incremental lookback window be? See run
for details. A size of 1 hour, and a lookback window of 2 days, would yield at least 48 delete queries.
58 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 58 def incremental_lookback_size = 1.hour |
#lookback_window ⇒ ActiveSupport::Duration
How far from stale_at
to “look back” for stale rows. We cannot just use “row_updated_at < stale_at” since this would scan ALL the rows every time we delete rows. Instead, we only want to scale rows where “row_updated_at < stale_at AND row_updated_at > (stale_at - lookback_window)”. For example, a stale_at
of 20 days and a lookback_window
of 7 days would look to delete rows 20 to 27 days old.
If the stale row deleter is run daily, a good lookback window would be 2-3 days, since as long as the job is running we shouldn’t find rows that aren’t cleaned up.
Use run_initial
to do a full table scan, which may be necessary when running this feature for a table for the first time.
34 35 36 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 34 def lookback_window raise NotImplementedError end |
#run(lookback_window: self.lookback_window) ⇒ Object
Run the deleter.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 63 def run(lookback_window: self.lookback_window) # The algorithm to delete stale rows is complex for a couple of reasons. # The native solution is "delete rows where updated_at > (stale_at - lookback_window) AND updated_at < stale_at" # However, this would cause a single massive query over the entire candidate row space, # which has problems: # - The query can be very slow # - Deadlocks can happen due to the slow query. # - If the query is interrupted (due to a worker restart), all progress is lost. # - Scanning the large 'updated at timestamp' index can cause the database to do a sequential scan. # # Instead, we need to do issue a series of fast queries over small 'updated at' windows: # # - Break the lookback period into hour-long windows. # If the lookback_window is 2 days, this would issue 48 queries. # But each one would be very fast, since the column is indexed. # - For each small window, delete in chunks, like: # DELETE from "public"."icalendar_event_v1_aaaa" # WHERE pk IN ( # SELECT pk FROM "public"."icalendar_event_v1_aaaa" # WHERE row_updated_at >= (hour start) # AND row_updated_at < (hour end) # LIMIT (chunk size) # ) # - Issue each DELETE within a transaction with seqscan disabled. # This is crude, but we know for our usage case that we never want a seqscan. # - Using the chunked delete with the hour-long (small-sized) windows # is important. Because each chunk requires scanning potentially the entire indexed row space, # it would take longer and longer to find 10k rows to fill the chunk. # This is, for example, the same performance problem that OFFSET/LIMIT pagination # has at later pages (but not earlier pages). self.replicator.admin_dataset do |ds| stale_window_late = Time.now - self.stale_at stale_window_early = lookback_window.nil? ? ds.min(self.updated_at_column) : stale_window_late - lookback_window # If we are querying the whole table (no lookback window), and have no rows, # there's nothing to clean up. break if stale_window_early.nil? # We must disable vacuuming for this sort of cleanup. # Otherwise, it will take a LONG time since we use a series of short deletes. self.set_autovacuum(ds.db, false) if self.replicator.partition? # If the replicator is partitioned, we need to delete stale rows on partition separately. # We DELETE with a LIMIT in chunks, but when we run this on the main table, it'll run the query # on every partition BEFORE applying the limit. You'll see this manifest with speed, # but also the planner using a sequential scan for the delete, rather than hitting an index. # Instead, DELETE from each partition in chunks, which will use the indices, and apply the limit properly. self.replicator.existing_partitions(ds.db).each do |p| pdb = ds.db[self.replicator.qualified_table_sequel_identifier(table: p.partition_name)] self._run_delete(pdb, stale_window_early:, stale_window_late:) end else self._run_delete(ds, stale_window_early:, stale_window_late:) end end ensure # Open a new connection in case the previous one is trashed for whatever reason. self.replicator.admin_dataset do |ds| self.set_autovacuum(ds.db, true) end end |
#run_initial ⇒ Object
Run with lookback_window
as nil
, which does a full table scan.
164 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 164 def run_initial = self.run(lookback_window: nil) |
#set_autovacuum(db, on) ⇒ Object
157 158 159 160 161 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 157 def set_autovacuum(db, on) return if self.replicator.partition? arg = on ? "on" : "off" db << "ALTER TABLE #{self.replicator.schema_and_table_symbols.join('.')} SET (autovacuum_enabled='#{arg}')" end |
#stale_at ⇒ ActiveSupport::Duration
When a row is considered ‘stale’. For example, a value of 35.days
would treat any row older than 35 days as stale.
17 18 19 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 17 def stale_at raise NotImplementedError end |
#stale_condition ⇒ Hash
Other additional ‘stale’ conditions, like ‘cancelled’
46 47 48 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 46 def stale_condition raise NotImplementedError end |
#updated_at_column ⇒ Symbol
Name of the column, like :row_updated_at
.
40 41 42 |
# File 'lib/webhookdb/replicator/base_stale_row_deleter.rb', line 40 def updated_at_column raise NotImplementedError end |