Class: MessageStore::Postgres::Write

Inherits:
Object
  • Object
show all
Includes:
Write
Defined in:
lib/message_store/postgres/write.rb

Instance Method Summary collapse

Instance Method Details

#configure(session: nil) ⇒ Object



8
9
10
# File 'lib/message_store/postgres/write.rb', line 8

def configure(session: nil)
  Put.configure(self, session: session)
end

#write(batch, stream_name, expected_version: nil) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/message_store/postgres/write.rb', line 12

def write(batch, stream_name, expected_version: nil)
  logger.trace(tag: :write) do
    message_types = batch.map {|message_data| message_data.type }.uniq.join(', ')
    "Writing batch (Stream Name: #{stream_name}, Types: #{message_types}, Number of Messages: #{batch.length}, Expected Version: #{expected_version.inspect})"
  end

  unless expected_version.nil?
    expected_version = ExpectedVersion.canonize(expected_version)
  end

  last_position = nil
  put.session.transaction do
    batch.each do |message_data|
      last_position = write_message_data(message_data, stream_name, expected_version: expected_version)

      unless expected_version.nil?
        expected_version += 1
      end
    end
  end

  logger.debug(tag: :write) do
    message_types = batch.map {|message_data| message_data.type }.uniq.join(', ')
    "Wrote batch (Stream Name: #{stream_name}, Types: #{message_types}, Number of Messages: #{batch.length}, Expected Version: #{expected_version.inspect})"
  end

  last_position
end

#write_message_data(message_data, stream_name, expected_version: nil) ⇒ Object



41
42
43
44
45
46
47
48
49
# File 'lib/message_store/postgres/write.rb', line 41

def write_message_data(message_data, stream_name, expected_version: nil)
  logger.trace(tag: :write) { "Writing message data (Stream Name: #{stream_name}, Type: #{message_data.type}, Expected Version: #{expected_version.inspect})" }
  logger.trace(tags: [:data, :message_data]) { message_data.pretty_inspect }

  put.(message_data, stream_name, expected_version: expected_version).tap do
    logger.debug(tag: :write) { "Wrote message data (Stream Name: #{stream_name}, Type: #{message_data.type}, Expected Version: #{expected_version.inspect})" }
    logger.debug(tags: [:data, :message_data]) { message_data.pretty_inspect }
  end
end