Class: Wal::RecordWatcher
- Inherits:
-
Object
- Object
- Wal::RecordWatcher
- Includes:
- Watcher
- Defined in:
- lib/wal/record_watcher.rb
Overview
Watcher that process records at the end of a transaction, keeping only its final state.
Example:
“‘ruby class InventoryAvailabilityWatcher < Wal::RecordWatcher
on_save Item, changed: %w[weight_unid_id] do |event|
recalculate_inventory_availability(event.primary_key)
end
on_save SalesOrder, changed: %w[status] do |event|
next unless event.attributes_changes(:status).one? "filled"
OrderItem
.where(sales_order_id: event.primary_key)
.pluck(:item_id)
.each { |item_id| recalculate_inventory_availability(item_id) }
end
on_save OrderItem, changed: %w[item_id weight_unit weight_unid_id] do |event|
if (old_item_id, new_item_id = event.attributes_changes("item_id"))
recalculate_inventory_availability(old_item_id)
recalculate_inventory_availability(new_item_id)
else
recalculate_inventory_availability(event.attribute(:item_id))
end
end
on_delete OrderItem do |event|
recalculate_inventory_availability(event.attribute(:item_id))
end
def recalculate_inventory_availability(item_id)
...
end
end “‘
Defined Under Namespace
Classes: MemoryRecordWatcher, NoAggregationWatcher, TemporaryTableRecordWatcher
Class Method Summary collapse
- .inherited(subclass) ⇒ Object
- .on_delete(table, &block) ⇒ Object
- .on_insert(table, &block) ⇒ Object
- .on_save(table, changed: nil, &block) ⇒ Object
- .on_update(table, changed: nil, &block) ⇒ Object
Instance Method Summary collapse
-
#aggregation_strategy(begin_transaction_event) ⇒ Object
‘RecordWatcher` supports three processing strategies:.
- #on_event(event) ⇒ Object
- #on_record_changed(event) ⇒ Object
- #should_watch_table?(table) ⇒ Boolean
Methods included from Watcher
Class Method Details
.inherited(subclass) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/wal/record_watcher.rb', line 42 def self.inherited(subclass) super subclass.class_eval do def self.change_callbacks @change_callbacks ||= Hash.new { |hash, key| hash[key] = [] } end def self.delete_callbacks @delete_callbacks ||= Hash.new { |hash, key| hash[key] = [] } end end end |
.on_delete(table, &block) ⇒ Object
70 71 72 73 |
# File 'lib/wal/record_watcher.rb', line 70 def self.on_delete(table, &block) table = table.is_a?(String) ? table : table.table_name delete_callbacks[table].push(block: block) end |
.on_insert(table, &block) ⇒ Object
55 56 57 58 |
# File 'lib/wal/record_watcher.rb', line 55 def self.on_insert(table, &block) table = table.is_a?(String) ? table : table.table_name change_callbacks[table].push(only: [:create], block: block) end |
.on_save(table, changed: nil, &block) ⇒ Object
65 66 67 68 |
# File 'lib/wal/record_watcher.rb', line 65 def self.on_save(table, changed: nil, &block) table = table.is_a?(String) ? table : table.table_name change_callbacks[table].push(only: [:create, :update], changed: changed&.map(&:to_s), block: block) end |
.on_update(table, changed: nil, &block) ⇒ Object
60 61 62 63 |
# File 'lib/wal/record_watcher.rb', line 60 def self.on_update(table, changed: nil, &block) table = table.is_a?(String) ? table : table.table_name change_callbacks[table].push(only: [:update], changed: changed&.map(&:to_s), block: block) end |
Instance Method Details
#aggregation_strategy(begin_transaction_event) ⇒ Object
‘RecordWatcher` supports three processing strategies:
‘:memory`: Stores and aggregates records from a single transaction in memory. This has better performance but uses
more memory, as at least one event for each record must be stored in memory until the end of a transaction
‘:temporary_table`: Offloads the record aggregation to a temporary table on the database. This is useful when you are processing very large transactions that can’t fit in memory. The tradeoff is obviously a worse performance.
‘:none`: Doesn’t aggregate anything at all, so multiple updates on the same record on the same transaction would be notified. Also, if the same record is deleted on the same transaction it was created, this would end up triggering both ‘on_insert` and `on_delete` callbacks. This strategy should usually be avoided.
These strategies can be defined per transaction, and by default it will uses the memory one, and only fallback to the temporary table if the transaction size is roughly 2 gigabytes or more.
118 119 120 121 122 123 124 |
# File 'lib/wal/record_watcher.rb', line 118 def aggregation_strategy(begin_transaction_event) if begin_transaction_event.estimated_size > 1024.pow(3) * 2 :temporary_table else :memory end end |
#on_event(event) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/wal/record_watcher.rb', line 126 def on_event(event) if event.is_a? BeginTransactionEvent @current_record_watcher = case (strategy = aggregation_strategy(event)) when :memory MemoryRecordWatcher.new(self) when :temporary_table TemporaryTableRecordWatcher.new(self) when :none NoAggregationWatcher.new(self) else raise "Invalid aggregation strategy: #{strategy}" end end @current_record_watcher.on_event(event) end |
#on_record_changed(event) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/wal/record_watcher.rb', line 75 def on_record_changed(event) case event when InsertEvent self.class.change_callbacks[event.full_table_name] .filter { |callback| callback[:only].include? :create } .each { |callback| instance_exec(event, &callback[:block]) } when UpdateEvent self.class.change_callbacks[event.full_table_name] .filter { |callback| callback[:only].include? :update } .each do |callback| if (attributes = callback[:changed]) instance_exec(event, &callback[:block]) unless (event.diff.keys & attributes).empty? else instance_exec(event, &callback[:block]) end end when DeleteEvent self.class.delete_callbacks[event.full_table_name].each do |callback| instance_exec(event, &callback[:block]) end end end |
#should_watch_table?(table) ⇒ Boolean
100 101 102 |
# File 'lib/wal/record_watcher.rb', line 100 def should_watch_table?(table) (self.class.change_callbacks.keys | self.class.delete_callbacks.keys).include? table end |