Module: ActiveRecord::ConnectionAdapters::Spanner::DatabaseStatements

Included in:
ActiveRecord::ConnectionAdapters::SpannerAdapter
Defined in:
lib/active_record/connection_adapters/spanner/database_statements.rb

Constant Summary collapse

VERSION_7_1_0 =
Gem::Version.create "7.1.0"
RequestOptions =
Google::Cloud::Spanner::V1::RequestOptions
TransactionMutationLimitExceededError =
Google::Cloud::Spanner::Errors::TransactionMutationLimitExceededError
COMMENT_REGEX =
ActiveRecord::ConnectionAdapters::AbstractAdapter::COMMENT_REGEX

Instance Method Summary collapse

Instance Method Details

#_has_pk_binding(pk, binds) ⇒ Object



182
183
184
185
186
187
188
189
190
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 182

def _has_pk_binding pk, binds
  if pk.respond_to? :each
    has_value = true
    pk.each { |col| has_value &&= binds.any? { |bind| bind.name == col } }
    has_value
  else
    binds.any? { |bind| bind.name == pk }
  end
end

#append_request_tag_from_query_logs(sql, binds) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 99

def append_request_tag_from_query_logs sql, binds
  possible_prefixes = [
    "/*request_tag:true,",
    "/*_request_tag='true',",
    "/*_request_tag:true,",
    "/*_request_tag='true',"
  ]
  possible_prefixes.each do |prefix|
    if sql.start_with? prefix
      append_request_tag_from_query_logs_with_format sql, binds, prefix
    end
  end
end

#append_request_tag_from_query_logs_with_format(sql, binds, prefix) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 113

def append_request_tag_from_query_logs_with_format sql, binds, prefix
  end_of_comment = sql.index "*/", prefix.length
  return unless end_of_comment

  request_tag = sql[prefix.length, end_of_comment - prefix.length]
  options = binds.find { |bind| bind.is_a? RequestOptions } || RequestOptions.new
  if options.request_tag == ""
    options.request_tag = request_tag
  else
    options.request_tag += ",#{request_tag}"
  end

  binds.append options
end

#begin_db_transactionObject



313
314
315
316
317
318
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 313

def begin_db_transaction
  log "BEGIN" do
    opts = @_spanner_begin_transaction_options || {}
    @connection.begin_transaction nil, **opts
  end
end

#begin_isolated_db_transaction(isolation) ⇒ Object

Begins a transaction on the database with the specified isolation level. Cloud Spanner only supports isolation level :serializable, but also defines three additional 'isolation levels' that can be used to start specific types of Spanner transactions:

  • :read_only: Starts a read-only snapshot transaction using a strong timestamp bound.
  • :buffered_mutations: Starts a read/write transaction that will use mutations instead of DML for single-row inserts/updates/deletes. Mutations are buffered locally until the transaction is committed, and any changes during a transaction cannot be read by the application.
  • :pdml: Starts a Partitioned DML transaction. Executing multiple DML statements in one PDML transaction block is NOT supported A PDML transaction is not guaranteed to be atomic. See https://cloud.google.com/spanner/docs/dml-partitioned for more information.

In addition to the above, a Hash containing read-only snapshot options may be used to start a specific read-only snapshot:

  • { timestamp: Time } Starts a read-only snapshot at the given timestamp.
  • { staleness: Integer } Starts a read-only snapshot with the given staleness in seconds.
  • { strong: } Starts a read-only snapshot with strong timestamp bound (this is the same as :read_only)


338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 338

def begin_isolated_db_transaction isolation
  opts = @_spanner_begin_transaction_options || {}
  # If isolation level is specified in the options, use that instead of the default isolation level.
  isolation_option = opts[:isolation] || isolation
  if isolation_option.is_a? Hash
    raise "Unsupported isolation level: #{isolation_option}" unless
      isolation_option[:timestamp] || isolation_option[:staleness] || isolation_option[:strong]
    raise "Only one option is supported. It must be one of `timestamp`, `staleness` or `strong`." \
      if isolation_option.count != 1
  else
    raise "Unsupported isolation level: #{isolation_option}" unless
      [:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml,
       :fallback_to_pdml].include? isolation_option
  end

  log "BEGIN #{isolation_option}" do
    @connection.begin_transaction isolation_option, **opts.except(:isolation)
  end
end

#commit_db_transactionObject



358
359
360
361
362
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 358

def commit_db_transaction
  log "COMMIT" do
    @connection.commit_transaction
  end
end

#exec_mutation(mutation) ⇒ Object



192
193
194
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 192

def exec_mutation mutation
  @connection.current_transaction.buffer mutation
end

#exec_query(sql, name = "SQL", binds = [], prepare: false) ⇒ Object

rubocop:disable Lint/UnusedMethodArgument



157
158
159
160
161
162
163
164
165
166
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 157

def exec_query sql, name = "SQL", binds = [], prepare: false # rubocop:disable Lint/UnusedMethodArgument
  result = execute sql, name, binds
  if result.respond_to? :fields
    ActiveRecord::Result.new(
      result.fields.keys.map(&:to_s), result.rows.map(&:values)
    )
  else
    ActiveRecord::Result.new [], []
  end
end

#exec_update(sql, name = "SQL", binds = []) ⇒ Object Also known as: exec_delete

Raises:

  • (ActiveRecord::StatementInvalid)


210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 210

def exec_update sql, name = "SQL", binds = []
  # Check if a DML batch is active on the connection.
  if @connection.dml_batch?
    # This call buffers the SQL.
    execute sql, name, binds
    return
  end
  result = execute sql, name, binds
  # Make sure that we consume the entire result stream before trying to get the stats.
  # This is required because the ExecuteStreamingSql RPC is also used for (Partitioned) DML,
  # and this RPC can return multiple partial result sets for DML as well. Only the last partial
  # result set will contain the statistics. Although there will never be any rows, this makes
  # sure that the stream is fully consumed.
  result.rows.each { |_| } # rubocop:disable Lint/EmptyBlock
  return result.row_count if result.row_count

  raise ActiveRecord::StatementInvalid.new(
    "DML statement is invalid.", sql: sql
  )
end

#execute(sql, name = nil, binds = []) ⇒ Object

DDL, DML and DQL Statements



22
23
24
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 22

def execute sql, name = nil, binds = []
  internal_execute sql, name, binds
end

#execute_ddl(statements) ⇒ Object



244
245
246
247
248
249
250
251
252
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 244

def execute_ddl statements
  log "MIGRATION", "SCHEMA" do
    ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
      @connection.execute_ddl statements
    end
  end
rescue Google::Cloud::Error => error
  raise ActiveRecord::StatementInvalid, error
end

#execute_query_or_dml(statement_type, sql, name, binds) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 57

def execute_query_or_dml statement_type, sql, name, binds
  transaction_required = statement_type == :dml
  materialize_transactions

  # First process and remove any hints in the binds that indicate that
  # a different read staleness should be used than the default.
  staleness_hint = binds.find { |b| b.is_a? Arel::Visitors::StalenessHint }
  if staleness_hint
    selector = Google::Cloud::Spanner::Session.single_use_transaction staleness_hint.value
    binds.delete staleness_hint
  end
  request_options = binds.find { |b| b.is_a? RequestOptions }
  if request_options
    binds.delete request_options
  end

  log_args = [sql, name]
  log_args.push binds, type_casted_binds(binds) if log_statement_binds

  log(*log_args) do
    types, params = to_types_and_params binds
    ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
      if transaction_required
        transaction do
          @connection.execute_query sql,
                                    params: params,
                                    types: types,
                                    request_options: request_options,
                                    statement_type: statement_type
        end
      else
        @connection.execute_query sql,
                                  params: params,
                                  types: types,
                                  single_use_selector: selector,
                                  request_options: request_options,
                                  statement_type: statement_type
      end
    end
  end
end

#internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) ⇒ Object



26
27
28
29
30
31
32
33
34
35
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 26

def internal_exec_query sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false
  result = internal_execute sql, name, binds, prepare: prepare, async: async, allow_retry: allow_retry
  if result
    ActiveRecord::Result.new(
      result.fields.keys.map(&:to_s), result.rows.map(&:values)
    )
  else
    ActiveRecord::Result.new [], []
  end
end

#internal_execute(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 37

def internal_execute sql, name = "SQL", binds = [],
                     prepare: false, async: false, allow_retry: false # rubocop:disable Lint/UnusedMethodArgument
  statement_type = sql_statement_type sql
  # Call `transform` to invoke any query transformers that might have been registered.
  sql = transform sql
  append_request_tag_from_query_logs sql, binds

  if preventing_writes? && [:dml, :ddl].include?(statement_type)
    raise ActiveRecord::ReadOnlyError(
      "Write query attempted while in readonly mode: #{sql}"
    )
  end

  if statement_type == :ddl
    execute_ddl sql
  else
    execute_query_or_dml statement_type, sql, name, binds
  end
end

#query(sql, name = nil) ⇒ Object

ActiveRecord.gem_version < VERSION_7_1_0



153
154
155
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 153

def query sql, name = nil
  exec_query sql, name
end

#rollback_db_transactionObject



364
365
366
367
368
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 364

def rollback_db_transaction
  log "ROLLBACK" do
    @connection.rollback_transaction
  end
end

#sql_for_insert(sql, pk, binds) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 131

def sql_for_insert sql, pk, binds, returning
  if pk && !_has_pk_binding(pk, binds)
    # Add the primary key to the columns that should be returned if there is no value specified for it.
    returning ||= []
    returning |= if pk.respond_to? :each
                   pk
                 else
                   [pk]
                 end
  end
  if returning&.any?
    returning_columns_statement = returning.map { |c| quote_column_name c }.join(", ")
    sql = "#{sql} THEN RETURN #{returning_columns_statement}"
  end

  [sql, binds]
end

#transaction(requires_new: nil, isolation: nil, joinable: true, **kwargs, &block) ⇒ Object

Transaction



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 256

def transaction requires_new: nil, isolation: nil, joinable: true, **kwargs, &block # rubocop:disable Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity
  commit_options = kwargs.delete :commit_options
  exclude_from_streams = kwargs.delete :exclude_txn_from_change_streams
  @_spanner_begin_transaction_options = {
    exclude_txn_from_change_streams: exclude_from_streams
  }
  if !requires_new && current_transaction.joinable?
    return super
  end

  backoff = 0.2
  begin
    super do
      # Once the transaction has been started by `super`, apply your custom options
      # to the Spanner transaction object.
      if commit_options && @connection.current_transaction
        @connection.current_transaction.set_commit_options commit_options
      end

      yield
    end
  rescue ActiveRecord::StatementInvalid => err
    if err.cause.is_a? Google::Cloud::AbortedError
      sleep(delay_from_aborted(err) || (backoff *= 1.3))
      retry
    elsif TransactionMutationLimitExceededError.is_mutation_limit_error? err.cause
      is_fallback_enabled = isolation == :fallback_to_pdml
      raise unless is_fallback_enabled
      @_spanner_begin_transaction_options[:isolation] = :pdml
      retry
    else
      raise
    end
  rescue Google::Cloud::AbortedError => err
    sleep(delay_from_aborted(err) || backoff *= 1.3)
    retry
  ensure
    # Clean up the instance variable to avoid leaking options.
    @_spanner_begin_transaction_options = nil
  end
end

#transaction_isolation_levelsObject



298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 298

def transaction_isolation_levels
  {
    read_uncommitted:   "READ UNCOMMITTED",
    read_committed:     "READ COMMITTED",
    repeatable_read:    "REPEATABLE READ",
    serializable:       "SERIALIZABLE",

    # These are not really isolation levels, but it is the only (best) way to pass in additional
    # transaction options to the connection.
    read_only:          "READ_ONLY",
    buffered_mutations: "BUFFERED_MUTATIONS",
    fallback_to_pdml: "FALLBACK_TO_PDML"
  }
end

#truncate(table_name, name = nil) ⇒ Object



232
233
234
235
236
237
238
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 232

def truncate table_name, name = nil
  Array(table_name).each do |t|
    log "TRUNCATE #{t}", name do
      @connection.truncate t
    end
  end
end

#update(arel, name = nil, binds = []) ⇒ Object Also known as: delete



196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 196

def update arel, name = nil, binds = []
  # Add a `WHERE TRUE` if it is an update_all or delete_all call that uses DML.
  if !should_use_mutation(arel) && arel.respond_to?(:ast) && arel.ast.wheres.empty?
    arel.ast.wheres << Arel::Nodes::SqlLiteral.new("TRUE")
  end
  return super unless should_use_mutation arel

  raise "Unsupported update for use with mutations: #{arel}" unless arel.is_a? Arel::DeleteManager

  exec_mutation create_delete_all_mutation arel if arel.is_a? Arel::DeleteManager
  0 # Affected rows (unknown)
end

#write_query?(sql) ⇒ Boolean

Returns:

  • (Boolean)


240
241
242
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 240

def write_query? sql
  sql_statement_type(sql) == :dml
end