Class: Wal::StreamingWatcher

Inherits:
Object
  • Object
show all
Includes:
Watcher
Defined in:
lib/wal/streaming_watcher.rb

Overview

A watcher that streams all the events of each WAL transaction on a separate thread.

Useful to improve the throughput, as it will allow you to process events while fetching for more in parallel.

Example:

Watcher that persists all delete events as it arrives using a single database transaction, and without waiting for the full WAL log transaction to be finished.

“‘ruby class RegisterDeletesWalWatcher < Wal::StreamingWalWatcher

def on_transaction_events(events)
  DeletedApplicationRecord.transaction do
    events
      .lazy
      .filter { |event| event.is_a? DeleteEvent }
      .each { |event| DeletedApplicationRecord.create_from_event(event) }
  end
end

end “‘

Instance Method Summary collapse

Methods included from Watcher

#should_watch_table?, #valid_context_prefix?

Instance Method Details

#on_event(event) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/wal/streaming_watcher.rb', line 32

def on_event(event)
  case event
  when BeginTransactionEvent
    @queue = SizedQueue.new(queue_size(event))

    event_stream = Enumerator.new do |y|
      while (item = @queue.pop)
        case item
        when CommitTransactionEvent
          y << item
          break
        else
          y << item
        end
      end
    end
    @worker = Thread.new { on_transaction_events(event_stream) }

    @queue << event

  when CommitTransactionEvent
    @queue << event
    @worker.join

    # We are cleaning this up to hint to Ruby GC that this can be freed before the next begin transaction arrives
    @queue.clear
    @queue = nil

  else
    @queue << event
  end
end

#on_transaction_events(events) ⇒ Object



26
# File 'lib/wal/streaming_watcher.rb', line 26

def on_transaction_events(events); end

#queue_size(event) ⇒ Object



28
29
30
# File 'lib/wal/streaming_watcher.rb', line 28

def queue_size(event)
  5_000
end