Class: ActiveRecordSpannerAdapter::Transaction
- Inherits:
-
Object
- Object
- ActiveRecordSpannerAdapter::Transaction
- Defined in:
- lib/activerecord_spanner_adapter/transaction.rb
Instance Attribute Summary collapse
-
#begin_transaction_selector ⇒ Object
readonly
Returns the value of attribute begin_transaction_selector.
-
#commit_options ⇒ Object
readonly
Returns the value of attribute commit_options.
-
#exclude_txn_from_change_streams ⇒ Object
Returns the value of attribute exclude_txn_from_change_streams.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
- #_transaction_isolation_level_to_grpc(isolation) ⇒ Object
- #active? ⇒ Boolean
-
#begin ⇒ Object
Begins the transaction.
- #buffer(mutation) ⇒ Object
- #commit ⇒ Object
-
#force_begin_read_write ⇒ Object
Forces a BeginTransaction RPC for a read/write transaction.
-
#grpc_transaction=(grpc) ⇒ Object
Sets the underlying gRPC transaction to use for this Transaction.
- #grpc_transaction? ⇒ Boolean
-
#initialize(connection, isolation, commit_options = nil, exclude_txn_from_change_streams: false) ⇒ Transaction
constructor
A new instance of Transaction.
- #isolation ⇒ Object
- #mark_aborted ⇒ Object
- #next_sequence_number ⇒ Object
- #rollback ⇒ Object
-
#set_commit_options(options) ⇒ Object
Sets the commit options for this transaction.
- #shoot_and_forget_rollback ⇒ Object
- #transaction_selector ⇒ Object
Constructor Details
#initialize(connection, isolation, commit_options = nil, exclude_txn_from_change_streams: false) ⇒ Transaction
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 16 def initialize connection, isolation, = nil, exclude_txn_from_change_streams: false @connection = connection @isolation = isolation @committable = ![:read_only, :pdml].include?(isolation) && !isolation.is_a?(Hash) @state = :INITIALIZED @sequence_number = 0 @mutations = [] = @exclude_txn_from_change_streams = exclude_txn_from_change_streams end |
Instance Attribute Details
#begin_transaction_selector ⇒ Object (readonly)
Returns the value of attribute begin_transaction_selector.
11 12 13 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 11 def begin_transaction_selector @begin_transaction_selector end |
#commit_options ⇒ Object (readonly)
Returns the value of attribute commit_options.
10 11 12 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 10 def end |
#exclude_txn_from_change_streams ⇒ Object
Returns the value of attribute exclude_txn_from_change_streams.
12 13 14 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 12 def exclude_txn_from_change_streams @exclude_txn_from_change_streams end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
9 10 11 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 9 def state @state end |
Instance Method Details
#_transaction_isolation_level_to_grpc(isolation) ⇒ Object
87 88 89 90 91 92 93 94 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 87 def _transaction_isolation_level_to_grpc isolation case isolation when :serializable Google::Cloud::Spanner::V1::TransactionOptions::IsolationLevel::SERIALIZABLE when :repeatable_read Google::Cloud::Spanner::V1::TransactionOptions::IsolationLevel::REPEATABLE_READ end end |
#active? ⇒ Boolean
27 28 29 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 27 def active? @state == :STARTED end |
#begin ⇒ Object
Begins the transaction.
Read-only and PDML transactions are started by executing a BeginTransaction RPC. Read/write transactions are not really started by this method, and instead a transaction selector is prepared that will be included with the first statement on the transaction.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 46 def begin raise "Nested transactions are not allowed" if @state != :INITIALIZED begin case @isolation when Hash if @isolation[:timestamp] @grpc_transaction = @connection.session.create_snapshot timestamp: @isolation[:timestamp] elsif @isolation[:staleness] @grpc_transaction = @connection.session.create_snapshot staleness: @isolation[:staleness] elsif @isolation[:strong] @grpc_transaction = @connection.session.create_snapshot strong: true else raise "Invalid snapshot argument: #{@isolation}" end when :read_only @grpc_transaction = @connection.session.create_snapshot strong: true when :pdml @grpc_transaction = @connection.session.create_pdml else grpc_isolation = _transaction_isolation_level_to_grpc @isolation @begin_transaction_selector = Google::Cloud::Spanner::V1::TransactionSelector.new \ begin: Google::Cloud::Spanner::V1::TransactionOptions.new( read_write: Google::Cloud::Spanner::V1::TransactionOptions::ReadWrite.new, isolation_level: grpc_isolation, exclude_txn_from_change_streams: @exclude_txn_from_change_streams ) end @state = :STARTED rescue Google::Cloud::NotFoundError => e if @connection.session_not_found? e @connection.reset! retry end @state = :FAILED raise rescue StandardError @state = :FAILED raise end end |
#buffer(mutation) ⇒ Object
36 37 38 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 36 def buffer mutation @mutations << mutation end |
#commit ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 112 def commit raise "This transaction is not active" unless active? begin # Start a transaction with an explicit BeginTransaction RPC if the transaction only contains mutations. force_begin_read_write if @committable && !@mutations.empty? && !@grpc_transaction if @committable && @grpc_transaction @connection.session.commit_transaction @grpc_transaction, @mutations, commit_options: end @state = :COMMITTED rescue Google::Cloud::NotFoundError => e if @connection.session_not_found? e shoot_and_forget_rollback @connection.reset! @connection.raise_aborted_err end @state = :FAILED raise rescue StandardError @state = :FAILED raise end end |
#force_begin_read_write ⇒ Object
Forces a BeginTransaction RPC for a read/write transaction. This is used by a connection if the first statement of a transaction failed.
98 99 100 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 98 def force_begin_read_write @grpc_transaction = @connection.session.create_transaction end |
#grpc_transaction=(grpc) ⇒ Object
Sets the underlying gRPC transaction to use for this Transaction. This is used for queries/DML statements that inlined the BeginTransaction option and returned a transaction in the metadata.
164 165 166 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 164 def grpc_transaction= grpc @grpc_transaction = Google::Cloud::Spanner::Transaction.from_grpc grpc, @connection.session end |
#grpc_transaction? ⇒ Boolean
168 169 170 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 168 def grpc_transaction? @grpc_transaction if @grpc_transaction end |
#isolation ⇒ Object
31 32 33 34 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 31 def isolation return nil unless active? @isolation end |
#mark_aborted ⇒ Object
157 158 159 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 157 def mark_aborted @state = :ABORTED end |
#next_sequence_number ⇒ Object
102 103 104 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 102 def next_sequence_number @sequence_number += 1 if @committable end |
#rollback ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 138 def rollback # Allow rollback after abort and/or a failed commit. raise "This transaction is not active" unless active? || @state == :FAILED || @state == :ABORTED if active? && @grpc_transaction # We do a shoot-and-forget rollback here, as the error that caused the transaction to be rolled back could # also have invalidated the transaction (e.g. `Session not found`). If the rollback fails for any other # reason, we also do not need to retry it or propagate the error to the application, as the transaction will # automatically be aborted by Cloud Spanner after 10 seconds anyways. shoot_and_forget_rollback end @state = :ROLLED_BACK end |
#set_commit_options(options) ⇒ Object
Sets the commit options for this transaction. This is used to set the options for the commit RPC, such as return_commit_stats and max_commit_delay.
108 109 110 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 108 def # rubocop:disable Naming/AccessorMethodName = &.dup end |
#shoot_and_forget_rollback ⇒ Object
151 152 153 154 155 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 151 def shoot_and_forget_rollback @connection.session.rollback @grpc_transaction.transaction_id if @committable rescue StandardError # Ignored end |
#transaction_selector ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/activerecord_spanner_adapter/transaction.rb', line 172 def transaction_selector return unless active? # Use the transaction that has been started by a BeginTransaction RPC or returned by a # statement, if present. return Google::Cloud::Spanner::V1::TransactionSelector.new id: @grpc_transaction.transaction_id \ if @grpc_transaction # Return a transaction selector that will instruct the statement to also start a transaction # and return its id as a side effect. @begin_transaction_selector end |