Class: Wal::RecordWatcher::TemporaryTableRecordWatcher

Inherits:
Object
  • Object
show all
Includes:
Watcher, Watcher::SeparatedEvents
Defined in:
lib/wal/record_watcher.rb

Instance Method Summary collapse

Methods included from Watcher::SeparatedEvents

#on_event

Methods included from Watcher

#on_event, #should_watch_table?, #valid_context_prefix?

Constructor Details

#initialize(watcher, batch_size: 5_000) ⇒ TemporaryTableRecordWatcher

Returns a new instance of TemporaryTableRecordWatcher.



233
234
235
236
# File 'lib/wal/record_watcher.rb', line 233

def initialize(watcher, batch_size: 5_000)
  @watcher = watcher
  @batch_size = 5_000
end

Instance Method Details

#on_begin(event) ⇒ Object



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/wal/record_watcher.rb', line 238

def on_begin(event)
  @table = begin
    table_name = "temp_record_watcher_#{SecureRandom.alphanumeric(10).downcase}"

    base_class.connection.create_table(table_name, temporary: true) do |t|
      t.bigint :transaction_id, null: false
      t.bigint :lsn, null: false
      t.column :action, :string, null: false
      t.string :table_name, null: false
      t.bigint :primary_key
      t.jsonb :old, null: false, default: {}
      t.jsonb :new, null: false, default: {}
      t.jsonb :context, null: false, default: {}
    end

    unique_index = %i[table_name primary_key]

    base_class.connection.add_index table_name, unique_index, unique: true

    Class.new(base_class) do
      self.table_name = table_name

      # All this sh$#1t was necessary because AR schema cache doesn't work with temporary tables...
      insert_all_class = Class.new(::ActiveRecord::InsertAll) do
        unique_index_definition = ::ActiveRecord::ConnectionAdapters::IndexDefinition.new(
          table_name, "unique_index", true, unique_index
        )
        define_method(:find_unique_index_for) { |_| unique_index_definition }
      end

      define_singleton_method(:upsert) do |attributes, update_only: nil|
        insert_all_class
          .new(
            none,
            connection,
            [attributes],
            on_duplicate: :update,
            unique_by: unique_index,
            update_only:,
            returning: nil,
            record_timestamps: nil,
          )
          .execute
      end
    end
  end
end

#on_commit(_event) ⇒ Object



286
287
288
289
290
291
292
293
294
295
# File 'lib/wal/record_watcher.rb', line 286

def on_commit(_event)
  @table
    .in_batches(of: @batch_size)
    .each_record
    .lazy
    .filter_map { |persisted_event| deserialize(persisted_event) }
    .each { |event| @watcher.on_record_changed(event) }

  base_class.connection.drop_table @table.table_name
end

#on_delete(event) ⇒ Object



305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/wal/record_watcher.rb', line 305

def on_delete(event)
  case @table.where(table_name: event.full_table_name, primary_key: event.primary_key).pluck(:action, :old).first
  in ["insert", _]
    @table.where(table_name: event.full_table_name, primary_key: event.primary_key).delete_all
  in ["update", old]
    @table.upsert(serialize(event).merge(old:))
  in ["delete", _]
    # We don't need to store another delete
  else
    @table.upsert(serialize(event))
  end
end

#on_insert(event) ⇒ Object



297
298
299
# File 'lib/wal/record_watcher.rb', line 297

def on_insert(event)
  @table.upsert(serialize(event))
end

#on_update(event) ⇒ Object



301
302
303
# File 'lib/wal/record_watcher.rb', line 301

def on_update(event)
  @table.upsert(serialize(event), update_only: %w[new])
end