Module: ActiveRecord::ConnectionAdapters::Spanner::DatabaseStatements

Included in:
ActiveRecord::ConnectionAdapters::SpannerAdapter
Defined in:
lib/active_record/connection_adapters/spanner/database_statements.rb

Constant Summary collapse

VERSION_7_1_0 =
Gem::Version.create "7.1.0"
RequestOptions =
Google::Cloud::Spanner::V1::RequestOptions
COMMENT_REGEX =
ActiveRecord::ConnectionAdapters::AbstractAdapter::COMMENT_REGEX

Instance Method Summary collapse

Instance Method Details

#_has_pk_binding(pk, binds) ⇒ Object



164
165
166
167
168
169
170
171
172
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 164

def _has_pk_binding pk, binds
  if pk.respond_to? :each
    has_value = true
    pk.each { |col| has_value &&= binds.any? { |bind| bind.name == col } }
    has_value
  else
    binds.any? { |bind| bind.name == pk }
  end
end

#append_request_tag_from_query_logs(sql, binds) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 85

def append_request_tag_from_query_logs sql, binds
  possible_prefixes = [
    "/*request_tag:true,",
    "/*_request_tag='true',",
    "/*_request_tag:true,",
    "/*_request_tag='true',"
  ]
  possible_prefixes.each do |prefix|
    if sql.start_with? prefix
      append_request_tag_from_query_logs_with_format sql, binds, prefix
    end
  end
end

#append_request_tag_from_query_logs_with_format(sql, binds, prefix) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 99

def append_request_tag_from_query_logs_with_format sql, binds, prefix
  end_of_comment = sql.index "*/", prefix.length
  return unless end_of_comment

  request_tag = sql[prefix.length, end_of_comment - prefix.length]
  options = binds.find { |bind| bind.is_a? RequestOptions } || RequestOptions.new
  if options.request_tag == ""
    options.request_tag = request_tag
  else
    options.request_tag += ",#{request_tag}"
  end

  binds.append options
end

#begin_db_transactionObject



263
264
265
266
267
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 263

def begin_db_transaction
  log "BEGIN" do
    @connection.begin_transaction
  end
end

#begin_isolated_db_transaction(isolation) ⇒ Object

Begins a transaction on the database with the specified isolation level. Cloud Spanner only supports isolation level :serializable, but also defines three additional 'isolation levels' that can be used to start specific types of Spanner transactions:

  • :read_only: Starts a read-only snapshot transaction using a strong timestamp bound.
  • :buffered_mutations: Starts a read/write transaction that will use mutations instead of DML for single-row inserts/updates/deletes. Mutations are buffered locally until the transaction is committed, and any changes during a transaction cannot be read by the application.
  • :pdml: Starts a Partitioned DML transaction. Executing multiple DML statements in one PDML transaction block is NOT supported A PDML transaction is not guaranteed to be atomic. See https://cloud.google.com/spanner/docs/dml-partitioned for more information.

In addition to the above, a Hash containing read-only snapshot options may be used to start a specific read-only snapshot:

  • { timestamp: Time } Starts a read-only snapshot at the given timestamp.
  • { staleness: Integer } Starts a read-only snapshot with the given staleness in seconds.
  • { strong: } Starts a read-only snapshot with strong timestamp bound (this is the same as :read_only)


287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 287

def begin_isolated_db_transaction isolation
  if isolation.is_a? Hash
    raise "Unsupported isolation level: #{isolation}" unless
      isolation[:timestamp] || isolation[:staleness] || isolation[:strong]
    raise "Only one option is supported. It must be one of `timestamp`, `staleness` or `strong`." \
      if isolation.count != 1
  else
    raise "Unsupported isolation level: #{isolation}" unless
      [:serializable, :repeatable_read, :read_only, :buffered_mutations, :pdml].include? isolation
  end

  log "BEGIN #{isolation}" do
    @connection.begin_transaction isolation
  end
end

#commit_db_transactionObject



303
304
305
306
307
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 303

def commit_db_transaction
  log "COMMIT" do
    @connection.commit_transaction
  end
end

#exec_mutation(mutation) ⇒ Object



174
175
176
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 174

def exec_mutation mutation
  @connection.current_transaction.buffer mutation
end

#exec_query(sql, name = "SQL", binds = [], prepare: false) ⇒ Object

rubocop:disable Lint/UnusedMethodArgument



143
144
145
146
147
148
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 143

def exec_query sql, name = "SQL", binds = [], prepare: false # rubocop:disable Lint/UnusedMethodArgument
  result = execute sql, name, binds
  ActiveRecord::Result.new(
    result.fields.keys.map(&:to_s), result.rows.map(&:values)
  )
end

#exec_update(sql, name = "SQL", binds = []) ⇒ Object Also known as: exec_delete

Raises:

  • (ActiveRecord::StatementInvalid)


192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 192

def exec_update sql, name = "SQL", binds = []
  result = execute sql, name, binds
  # Make sure that we consume the entire result stream before trying to get the stats.
  # This is required because the ExecuteStreamingSql RPC is also used for (Partitioned) DML,
  # and this RPC can return multiple partial result sets for DML as well. Only the last partial
  # result set will contain the statistics. Although there will never be any rows, this makes
  # sure that the stream is fully consumed.
  result.rows.each { |_| } # rubocop:disable Lint/EmptyBlock
  return result.row_count if result.row_count

  raise ActiveRecord::StatementInvalid.new(
    "DML statement is invalid.", sql: sql
  )
end

#execute(sql, name = nil, binds = []) ⇒ Object

DDL, DML and DQL Statements



20
21
22
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 20

def execute sql, name = nil, binds = []
  internal_execute sql, name, binds
end

#execute_ddl(statements) ⇒ Object



220
221
222
223
224
225
226
227
228
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 220

def execute_ddl statements
  log "MIGRATION", "SCHEMA" do
    ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
      @connection.execute_ddl statements
    end
  end
rescue Google::Cloud::Error => error
  raise ActiveRecord::StatementInvalid, error
end

#execute_query_or_dml(statement_type, sql, name, binds) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 51

def execute_query_or_dml statement_type, sql, name, binds
  transaction_required = statement_type == :dml
  materialize_transactions

  # First process and remove any hints in the binds that indicate that
  # a different read staleness should be used than the default.
  staleness_hint = binds.find { |b| b.is_a? Arel::Visitors::StalenessHint }
  if staleness_hint
    selector = Google::Cloud::Spanner::Session.single_use_transaction staleness_hint.value
    binds.delete staleness_hint
  end
  request_options = binds.find { |b| b.is_a? RequestOptions }
  if request_options
    binds.delete request_options
  end

  log_args = [sql, name]
  log_args.push binds, type_casted_binds(binds) if log_statement_binds

  log(*log_args) do
    types, params = to_types_and_params binds
    ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
      if transaction_required
        transaction do
          @connection.execute_query sql, params: params, types: types, request_options: request_options
        end
      else
        @connection.execute_query sql, params: params, types: types, single_use_selector: selector,
                                  request_options: request_options
      end
    end
  end
end

#internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) ⇒ Object



24
25
26
27
28
29
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 24

def internal_exec_query sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false
  result = internal_execute sql, name, binds, prepare: prepare, async: async, allow_retry: allow_retry
  ActiveRecord::Result.new(
    result.fields.keys.map(&:to_s), result.rows.map(&:values)
  )
end

#internal_execute(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 31

def internal_execute sql, name = "SQL", binds = [],
                     prepare: false, async: false, allow_retry: false # rubocop:disable Lint/UnusedMethodArgument
  statement_type = sql_statement_type sql
  # Call `transform` to invoke any query transformers that might have been registered.
  sql = transform sql
  append_request_tag_from_query_logs sql, binds

  if preventing_writes? && [:dml, :ddl].include?(statement_type)
    raise ActiveRecord::ReadOnlyError(
      "Write query attempted while in readonly mode: #{sql}"
    )
  end

  if statement_type == :ddl
    execute_ddl sql
  else
    execute_query_or_dml statement_type, sql, name, binds
  end
end

#query(sql, name = nil) ⇒ Object

ActiveRecord.gem_version < VERSION_7_1_0



139
140
141
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 139

def query sql, name = nil
  exec_query sql, name
end

#rollback_db_transactionObject



309
310
311
312
313
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 309

def rollback_db_transaction
  log "ROLLBACK" do
    @connection.rollback_transaction
  end
end

#sql_for_insert(sql, pk, binds) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 117

def sql_for_insert sql, pk, binds, returning
  if pk && !_has_pk_binding(pk, binds)
    # Add the primary key to the columns that should be returned if there is no value specified for it.
    returning ||= []
    returning |= if pk.respond_to? :each
                   pk
                 else
                   [pk]
                 end
  end
  if returning&.any?
    returning_columns_statement = returning.map { |c| quote_column_name c }.join(", ")
    sql = "#{sql} THEN RETURN #{returning_columns_statement}"
  end

  [sql, binds]
end

#transaction(requires_new: nil, isolation: nil, joinable: true) ⇒ Object

Transaction



232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 232

def transaction requires_new: nil, isolation: nil, joinable: true
  if !requires_new && current_transaction.joinable?
    return super
  end

  backoff = 0.2
  begin
    super
  rescue ActiveRecord::StatementInvalid => err
    if err.cause.is_a? Google::Cloud::AbortedError
      sleep(delay_from_aborted(err) || backoff *= 1.3)
      retry
    end
    raise
  end
end

#transaction_isolation_levelsObject



249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 249

def transaction_isolation_levels
  {
    read_uncommitted:   "READ UNCOMMITTED",
    read_committed:     "READ COMMITTED",
    repeatable_read:    "REPEATABLE READ",
    serializable:       "SERIALIZABLE",

    # These are not really isolation levels, but it is the only (best) way to pass in additional
    # transaction options to the connection.
    read_only:          "READ_ONLY",
    buffered_mutations: "BUFFERED_MUTATIONS"
  }
end

#truncate(table_name, name = nil) ⇒ Object



208
209
210
211
212
213
214
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 208

def truncate table_name, name = nil
  Array(table_name).each do |t|
    log "TRUNCATE #{t}", name do
      @connection.truncate t
    end
  end
end

#update(arel, name = nil, binds = []) ⇒ Object Also known as: delete



178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 178

def update arel, name = nil, binds = []
  # Add a `WHERE TRUE` if it is an update_all or delete_all call that uses DML.
  if !should_use_mutation(arel) && arel.respond_to?(:ast) && arel.ast.wheres.empty?
    arel.ast.wheres << Arel::Nodes::SqlLiteral.new("TRUE")
  end
  return super unless should_use_mutation arel

  raise "Unsupported update for use with mutations: #{arel}" unless arel.is_a? Arel::DeleteManager

  exec_mutation create_delete_all_mutation arel if arel.is_a? Arel::DeleteManager
  0 # Affected rows (unknown)
end

#write_query?(sql) ⇒ Boolean

Returns:

  • (Boolean)


216
217
218
# File 'lib/active_record/connection_adapters/spanner/database_statements.rb', line 216

def write_query? sql
  sql_statement_type(sql) == :dml
end