Class: ActiveRecordSpannerAdapter::Transaction

Inherits:
Object
  • Object
show all
Defined in:
lib/activerecord_spanner_adapter/transaction.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, isolation, commit_options = nil, exclude_txn_from_change_streams: false) ⇒ Transaction



16
17
18
19
20
21
22
23
24
25
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 16

def initialize connection, isolation, commit_options = nil, exclude_txn_from_change_streams: false
  @connection = connection
  @isolation = isolation
  @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash)
  @state = :INITIALIZED
  @sequence_number = 0
  @mutations = []
  @commit_options = commit_options
  @exclude_txn_from_change_streams = exclude_txn_from_change_streams
end

Instance Attribute Details

#begin_transaction_selectorObject (readonly)

Returns the value of attribute begin_transaction_selector.



11
12
13
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 11

def begin_transaction_selector
  @begin_transaction_selector
end

#commit_optionsObject (readonly)

Returns the value of attribute commit_options.



10
11
12
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 10

def commit_options
  @commit_options
end

#exclude_txn_from_change_streamsObject

Returns the value of attribute exclude_txn_from_change_streams.



12
13
14
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 12

def exclude_txn_from_change_streams
  @exclude_txn_from_change_streams
end

#stateObject (readonly)

Returns the value of attribute state.



9
10
11
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 9

def state
  @state
end

Instance Method Details

#_transaction_isolation_level_to_grpc(isolation) ⇒ Object



87
88
89
90
91
92
93
94
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 87

def _transaction_isolation_level_to_grpc isolation
  case isolation
  when :serializable
    Google::Cloud::Spanner::V1::TransactionOptions::IsolationLevel::SERIALIZABLE
  when :repeatable_read
    Google::Cloud::Spanner::V1::TransactionOptions::IsolationLevel::REPEATABLE_READ
  end
end

#active?Boolean



27
28
29
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 27

def active?
  @state == :STARTED
end

#beginObject

Begins the transaction.

Read-only and PDML transactions are started by executing a BeginTransaction RPC. Read/write transactions are not really started by this method, and instead a transaction selector is prepared that will be included with the first statement on the transaction.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 46

def begin
  raise "Nested transactions are not allowed" if @state != :INITIALIZED
  begin
    case @isolation
    when Hash
      if @isolation[:timestamp]
        @grpc_transaction = @connection.session.create_snapshot timestamp: @isolation[:timestamp]
      elsif @isolation[:staleness]
        @grpc_transaction = @connection.session.create_snapshot staleness: @isolation[:staleness]
      elsif @isolation[:strong]
        @grpc_transaction = @connection.session.create_snapshot strong: true
      else
        raise "Invalid snapshot argument: #{@isolation}"
      end
    when :read_only
      @grpc_transaction = @connection.session.create_snapshot strong: true
    when :pdml
      @grpc_transaction = @connection.session.create_pdml
    else
      grpc_isolation = _transaction_isolation_level_to_grpc @isolation
      @begin_transaction_selector = Google::Cloud::Spanner::V1::TransactionSelector.new \
        begin: Google::Cloud::Spanner::V1::TransactionOptions.new(
          read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new,
          isolation_level: grpc_isolation,
          exclude_txn_from_change_streams: @exclude_txn_from_change_streams
        )
    end
    @state = :STARTED
  rescue Google::Cloud::NotFoundError => e
    if @connection.session_not_found? e
      @connection.reset!
      retry
    end
    @state = :FAILED
    raise
  rescue StandardError
    @state = :FAILED
    raise
  end
end

#buffer(mutation) ⇒ Object



36
37
38
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 36

def buffer mutation
  @mutations << mutation
end

#commitObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 112

def commit
  raise "This transaction is not active" unless active?

  begin
    # Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations.
    force_begin_read_write if @committable && !@mutations.empty? && !@grpc_transaction
    if @committable && @grpc_transaction
      @connection.session.commit_transaction @grpc_transaction,
                                             @mutations,
                                             commit_options: commit_options
    end
    @state = :COMMITTED
  rescue Google::Cloud::NotFoundError => e
    if @connection.session_not_found? e
      shoot_and_forget_rollback
      @connection.reset!
      @connection.raise_aborted_err
    end
    @state = :FAILED
    raise
  rescue StandardError
    @state = :FAILED
    raise
  end
end

#force_begin_read_writeObject

Forces a BeginTransaction RPC for a read/write transaction. This is used by a connection if the first statement of a transaction failed.



98
99
100
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 98

def force_begin_read_write
  @grpc_transaction = @connection.session.create_transaction
end

#grpc_transaction=(grpc) ⇒ Object

Sets the underlying gRPC transaction to use for this Transaction. This is used for queries/DML statements that inlined the BeginTransaction option and returned a transaction in the metadata.



164
165
166
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 164

def grpc_transaction= grpc
  @grpc_transaction = Google::Cloud::Spanner::Transaction.from_grpc grpc, @connection.session
end

#grpc_transaction?Boolean



168
169
170
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 168

def grpc_transaction?
  @grpc_transaction if @grpc_transaction
end

#isolationObject



31
32
33
34
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 31

def isolation
  return nil unless active?
  @isolation
end

#mark_abortedObject



157
158
159
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 157

def mark_aborted
  @state = :ABORTED
end

#next_sequence_numberObject



102
103
104
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 102

def next_sequence_number
  @sequence_number += 1 if @committable
end

#rollbackObject



138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 138

def rollback
  # Allow rollback after abort and/or a failed commit.
  raise "This transaction is not active" unless active? || @state == :FAILED || @state == :ABORTED
  if active? && @grpc_transaction
    # We do a shoot-and-forget rollback here, as the error that caused the transaction to be rolled back could
    # also have invalidated the transaction (e.g. `Session not found`). If the rollback fails for any other
    # reason, we also do not need to retry it or propagate the error to the application, as the transaction will
    # automatically be aborted by Cloud Spanner after 10 seconds anyways.
    shoot_and_forget_rollback
  end
  @state = :ROLLED_BACK
end

#set_commit_options(options) ⇒ Object

Sets the commit options for this transaction. This is used to set the options for the commit RPC, such as return_commit_stats and max_commit_delay.



108
109
110
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 108

def set_commit_options options # rubocop:disable Naming/AccessorMethodName
  @commit_options = options&.dup
end

#shoot_and_forget_rollbackObject



151
152
153
154
155
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 151

def shoot_and_forget_rollback
  @connection.session.rollback @grpc_transaction.transaction_id if @committable
rescue StandardError
  # Ignored
end

#transaction_selectorObject



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 172

def transaction_selector
  return unless active?

  # Use the transaction that has been started by a BeginTransaction RPC or returned by a
  # statement, if present.
  return Google::Cloud::Spanner::V1::TransactionSelector.new id: @grpc_transaction.transaction_id \
      if @grpc_transaction

  # Return a transaction selector that will instruct the statement to also start a transaction
  # and return its id as a side effect.
  @begin_transaction_selector
end