Class: Wal::RecordWatcher::TemporaryTableRecordWatcher
- Inherits:
-
Object
- Object
- Wal::RecordWatcher::TemporaryTableRecordWatcher
- Includes:
- Watcher, Watcher::SeparatedEvents
- Defined in:
- lib/wal/record_watcher.rb
Instance Method Summary collapse
-
#initialize(watcher, batch_size: 5_000) ⇒ TemporaryTableRecordWatcher
constructor
A new instance of TemporaryTableRecordWatcher.
- #on_begin(event) ⇒ Object
- #on_commit(_event) ⇒ Object
- #on_delete(event) ⇒ Object
- #on_insert(event) ⇒ Object
- #on_update(event) ⇒ Object
Methods included from Watcher::SeparatedEvents
Methods included from Watcher
#on_event, #should_watch_table?, #valid_context_prefix?
Constructor Details
#initialize(watcher, batch_size: 5_000) ⇒ TemporaryTableRecordWatcher
Returns a new instance of TemporaryTableRecordWatcher.
233 234 235 236 |
# File 'lib/wal/record_watcher.rb', line 233 def initialize(watcher, batch_size: 5_000) @watcher = watcher @batch_size = 5_000 end |
Instance Method Details
#on_begin(event) ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/wal/record_watcher.rb', line 238 def on_begin(event) @table = begin table_name = "temp_record_watcher_#{SecureRandom.alphanumeric(10).downcase}" base_class.connection.create_table(table_name, temporary: true) do |t| t.bigint :transaction_id, null: false t.bigint :lsn, null: false t.column :action, :string, null: false t.string :table_name, null: false t.bigint :primary_key t.jsonb :old, null: false, default: {} t.jsonb :new, null: false, default: {} t.jsonb :context, null: false, default: {} end unique_index = %i[table_name primary_key] base_class.connection.add_index table_name, unique_index, unique: true Class.new(base_class) do self.table_name = table_name # All this sh$#1t was necessary because AR schema cache doesn't work with temporary tables... insert_all_class = Class.new(::ActiveRecord::InsertAll) do unique_index_definition = ::ActiveRecord::ConnectionAdapters::IndexDefinition.new( table_name, "unique_index", true, unique_index ) define_method(:find_unique_index_for) { |_| unique_index_definition } end define_singleton_method(:upsert) do |attributes, update_only: nil| insert_all_class .new( none, connection, [attributes], on_duplicate: :update, unique_by: unique_index, update_only:, returning: nil, record_timestamps: nil, ) .execute end end end end |
#on_commit(_event) ⇒ Object
286 287 288 289 290 291 292 293 294 295 |
# File 'lib/wal/record_watcher.rb', line 286 def on_commit(_event) @table .in_batches(of: @batch_size) .each_record .lazy .filter_map { |persisted_event| deserialize(persisted_event) } .each { |event| @watcher.on_record_changed(event) } base_class.connection.drop_table @table.table_name end |
#on_delete(event) ⇒ Object
305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/wal/record_watcher.rb', line 305 def on_delete(event) case @table.where(table_name: event.full_table_name, primary_key: event.primary_key).pluck(:action, :old).first in ["insert", _] @table.where(table_name: event.full_table_name, primary_key: event.primary_key).delete_all in ["update", old] @table.upsert(serialize(event).merge(old:)) in ["delete", _] # We don't need to store another delete else @table.upsert(serialize(event)) end end |
#on_insert(event) ⇒ Object
297 298 299 |
# File 'lib/wal/record_watcher.rb', line 297 def on_insert(event) @table.upsert(serialize(event)) end |
#on_update(event) ⇒ Object
301 302 303 |
# File 'lib/wal/record_watcher.rb', line 301 def on_update(event) @table.upsert(serialize(event), update_only: %w[new]) end |