Class: Wal::RecordWatcher

Inherits:
Object
  • Object
show all
Includes:
Watcher
Defined in:
lib/wal/record_watcher.rb

Overview

Watcher that process records at the end of a transaction, keeping only its final state.

Example:

“‘ruby class InventoryAvailabilityWatcher < Wal::RecordWatcher

on_save Item, changed: %w[weight_unid_id] do |event|
  recalculate_inventory_availability(event.primary_key)
end

on_save SalesOrder, changed: %w[status] do |event|
  next unless event.attributes_changes(:status).one? "filled"

  OrderItem
    .where(sales_order_id: event.primary_key)
    .pluck(:item_id)
    .each { |item_id| recalculate_inventory_availability(item_id) }
end

on_save OrderItem, changed: %w[item_id weight_unit weight_unid_id] do |event|
  if (old_item_id, new_item_id = event.attributes_changes("item_id"))
    recalculate_inventory_availability(old_item_id)
    recalculate_inventory_availability(new_item_id)
  else
    recalculate_inventory_availability(event.attribute(:item_id))
  end
end

on_delete OrderItem do |event|
  recalculate_inventory_availability(event.attribute(:item_id))
end

def recalculate_inventory_availability(item_id)
  ...
end

end “‘

Defined Under Namespace

Classes: MemoryRecordWatcher, NoAggregationWatcher, TemporaryTableRecordWatcher

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Watcher

#valid_context_prefix?

Class Method Details

.inherited(subclass) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/wal/record_watcher.rb', line 42

def self.inherited(subclass)
  super
  subclass.class_eval do
    def self.change_callbacks
      @change_callbacks ||= Hash.new { |hash, key| hash[key] = [] }
    end

    def self.delete_callbacks
      @delete_callbacks ||= Hash.new { |hash, key| hash[key] = [] }
    end
  end
end

.on_delete(table, &block) ⇒ Object



70
71
72
73
# File 'lib/wal/record_watcher.rb', line 70

def self.on_delete(table, &block)
  table = table.is_a?(String) ? table : table.table_name
  delete_callbacks[table].push(block: block)
end

.on_insert(table, &block) ⇒ Object



55
56
57
58
# File 'lib/wal/record_watcher.rb', line 55

def self.on_insert(table, &block)
  table = table.is_a?(String) ? table : table.table_name
  change_callbacks[table].push(only: [:create], block: block)
end

.on_save(table, changed: nil, &block) ⇒ Object



65
66
67
68
# File 'lib/wal/record_watcher.rb', line 65

def self.on_save(table, changed: nil, &block)
  table = table.is_a?(String) ? table : table.table_name
  change_callbacks[table].push(only: [:create, :update], changed: changed&.map(&:to_s), block: block)
end

.on_update(table, changed: nil, &block) ⇒ Object



60
61
62
63
# File 'lib/wal/record_watcher.rb', line 60

def self.on_update(table, changed: nil, &block)
  table = table.is_a?(String) ? table : table.table_name
  change_callbacks[table].push(only: [:update], changed: changed&.map(&:to_s), block: block)
end

Instance Method Details

#aggregation_strategy(begin_transaction_event) ⇒ Object

‘RecordWatcher` supports three processing strategies:

‘:memory`: Stores and aggregates records from a single transaction in memory. This has better performance but uses

more memory, as at least one event for each record must be stored in memory until the end of a transaction

‘:temporary_table`: Offloads the record aggregation to a temporary table on the database. This is useful when you are processing very large transactions that can’t fit in memory. The tradeoff is obviously a worse performance.

‘:none`: Doesn’t aggregate anything at all, so multiple updates on the same record on the same transaction would be notified. Also, if the same record is deleted on the same transaction it was created, this would end up triggering both ‘on_insert` and `on_delete` callbacks. This strategy should usually be avoided.

These strategies can be defined per transaction, and by default it will uses the memory one, and only fallback to the temporary table if the transaction size is roughly 2 gigabytes or more.



118
119
120
121
122
123
124
# File 'lib/wal/record_watcher.rb', line 118

def aggregation_strategy(begin_transaction_event)
  if begin_transaction_event.estimated_size > 1024.pow(3) * 2
    :temporary_table
  else
    :memory
  end
end

#on_event(event) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/wal/record_watcher.rb', line 126

def on_event(event)
  if event.is_a? BeginTransactionEvent
    @current_record_watcher = case (strategy = aggregation_strategy(event))
    when :memory
      MemoryRecordWatcher.new(self)
    when :temporary_table
      TemporaryTableRecordWatcher.new(self)
    when :none
      NoAggregationWatcher.new(self)
    else
      raise "Invalid aggregation strategy: #{strategy}"
    end
  end
  @current_record_watcher.on_event(event)
end

#on_record_changed(event) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/wal/record_watcher.rb', line 75

def on_record_changed(event)
  case event
  when InsertEvent
    self.class.change_callbacks[event.full_table_name]
      .filter { |callback| callback[:only].include? :create }
      .each { |callback| instance_exec(event, &callback[:block]) }

  when UpdateEvent
    self.class.change_callbacks[event.full_table_name]
      .filter { |callback| callback[:only].include? :update }
      .each do |callback|
        if (attributes = callback[:changed])
          instance_exec(event, &callback[:block]) unless (event.diff.keys & attributes).empty?
        else
          instance_exec(event, &callback[:block])
        end
      end

  when DeleteEvent
    self.class.delete_callbacks[event.full_table_name].each do |callback|
      instance_exec(event, &callback[:block])
    end
  end
end

#should_watch_table?(table) ⇒ Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/wal/record_watcher.rb', line 100

def should_watch_table?(table)
  (self.class.change_callbacks.keys | self.class.delete_callbacks.keys).include? table
end