Class: ActiveRecord::ConnectionAdapters::ClickhouseAdapter

Inherits:
AbstractAdapter
  • Object
show all
Includes:
ActiveRecord::ConnectionAdapters::Clickhouse::SchemaStatements
Defined in:
lib/active_record/connection_adapters/clickhouse_adapter.rb

Constant Summary collapse

ADAPTER_NAME =
'Clickhouse'.freeze
NATIVE_DATABASE_TYPES =
{
  string: { name: 'String' },
  integer: { name: 'UInt32' },
  big_integer: { name: 'UInt64' },
  float: { name: 'Float32' },
  decimal: { name: 'Decimal' },
  datetime: { name: 'DateTime' },
  datetime64: { name: 'DateTime64' },
  date: { name: 'Date' },
  boolean: { name: 'Bool' },
  uuid: { name: 'UUID' },

  enum8: { name: 'Enum8' },
  enum16: { name: 'Enum16' },

  int8:  { name: 'Int8' },
  int16: { name: 'Int16' },
  int32: { name: 'Int32' },
  int64:  { name: 'Int64' },
  int128: { name: 'Int128' },
  int256: { name: 'Int256' },

  uint8: { name: 'UInt8' },
  uint16: { name: 'UInt16' },
  uint32: { name: 'UInt32' },
  uint64: { name: 'UInt64' },
  # uint128: { name: 'UInt128' }, not yet implemented in clickhouse
  uint256: { name: 'UInt256' },
}.freeze

Instance Method Summary collapse

Methods included from ActiveRecord::ConnectionAdapters::Clickhouse::SchemaStatements

#assume_migrated_upto_version, #data_sources, #do_execute, #do_system_execute, #exec_delete, #exec_insert, #exec_insert_all, #exec_query, #exec_update, #execute, #indexes, #table_options, #tables

Constructor Details

#initialize(logger, connection_parameters, config, full_config) ⇒ ClickhouseAdapter

Initializes and connects a Clickhouse adapter.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 121

def initialize(logger, connection_parameters, config, full_config)
  super(nil, logger)
  @connection_parameters = connection_parameters
  @config = config
  @debug = full_config[:debug] || false
  @full_config = full_config

  @prepared_statements = false
  if ActiveRecord::version == Gem::Version.new('6.0.0')
    @prepared_statement_status = Concurrent::ThreadLocalVar.new(false)
  end

  connect
end

Instance Method Details

#_quote(value) ⇒ Object



217
218
219
220
221
222
223
224
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 217

def _quote(value)
  case value
  when Array
    '[' + value.map { |v| _quote(v) }.join(', ') + ']'
  else
    super
  end
end

#add_column(table_name, column_name, type, **options) ⇒ Object



344
345
346
347
348
349
350
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 344

def add_column(table_name, column_name, type, **options)
  return if options[:if_not_exists] == true && column_exists?(table_name, column_name, type)

  at = create_alter_table table_name
  at.add_column(column_name, type, **options)
  execute(schema_creation.accept(at), nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1})
end

#apply_cluster(sql) ⇒ Object



399
400
401
402
403
404
405
406
407
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 399

def apply_cluster(sql)
  if cluster
    normalized_cluster_name = cluster.start_with?('{') ? "'#{cluster}'" : cluster

    "#{sql} ON CLUSTER #{normalized_cluster_name}"
  else
    sql
  end
end

#arel_visitorObject

:nodoc:



149
150
151
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 149

def arel_visitor # :nodoc:
  Arel::Visitors::Clickhouse.new(self)
end

#build_insert_sql(insert) ⇒ Object

:nodoc:



417
418
419
420
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 417

def build_insert_sql(insert) # :nodoc:
  sql = +"INSERT #{insert.into} #{insert.values_list}"
  sql
end

#change_column(table_name, column_name, type, options = {}) ⇒ Object



358
359
360
361
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 358

def change_column(table_name, column_name, type, options = {})
  result = do_execute("ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1})
  raise "Error parse json response: #{result}" if result.presence && !result.is_a?(Hash)
end

#change_column_default(table_name, column_name, default) ⇒ Object



369
370
371
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 369

def change_column_default(table_name, column_name, default)
  change_column table_name, column_name, nil, {default: default}.compact
end

#change_column_null(table_name, column_name, null, default = nil) ⇒ Object



363
364
365
366
367
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 363

def change_column_null(table_name, column_name, null, default = nil)
  structure = table_structure(table_name).select{|v| v[0] == column_name.to_s}.first
  raise "Column #{column_name} not found in table #{table_name}" if structure.nil?
  change_column table_name, column_name, structure[1].gsub(/(Nullable\()?(.*?)\)?/, '\2'), {null: null, default: default}.compact
end

#clusterObject



373
374
375
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 373

def cluster
  @full_config[:cluster_name]
end

#column_name_for_operation(operation, node) ⇒ Object

:nodoc:



247
248
249
250
251
252
253
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 247

def column_name_for_operation(operation, node) # :nodoc:
  if ActiveRecord::version >= Gem::Version.new('6')
    visitor.compile(node)
  else
    column_name_from_arel_node(node)
  end
end

#create_database(name) ⇒ Object

Create a new ClickHouse database.



278
279
280
281
282
283
284
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 278

def create_database(name)
  sql = apply_cluster "CREATE DATABASE #{quote_table_name(name)}"
  log_with_debug(sql, adapter_name) do
    res = @connection.post("/?#{@config.except(:database).to_param}", sql)
    process_response(res)
  end
end

#create_schema_dumper(options) ⇒ Object

:nodoc:



267
268
269
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 267

def create_schema_dumper(options) # :nodoc:
  ClickhouseActiverecord::SchemaDumper.create(self, options)
end

#create_table(table_name, **options, &block) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 299

def create_table(table_name, **options, &block)
  options = apply_replica(table_name, options)
  td = create_table_definition(apply_cluster(table_name), **options)
  block.call td if block_given?
  td.column(:id, options[:id], null: false) if options[:id].present? && td[:id].blank?

  if options[:force]
    drop_table(table_name, options.merge(if_exists: true))
  end

  do_execute(schema_creation.accept(td), format: nil)

  if options[:with_distributed]
    distributed_table_name = options.delete(:with_distributed)
    sharding_key = options.delete(:sharding_key) || 'rand()'
    raise 'Set a cluster' unless cluster

    distributed_options =
      "Distributed(#{cluster}, #{@config[:database]}, #{table_name}, #{sharding_key})"
    create_table(distributed_table_name, **options.merge(options: distributed_options), &block)
  end
end

#create_view(table_name, **options) {|td| ... } ⇒ Object

Yields:

  • (td)


286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 286

def create_view(table_name, **options)
  options.merge!(view: true)
  options = apply_replica(table_name, options)
  td = create_table_definition(apply_cluster(table_name), **options)
  yield td if block_given?

  if options[:force]
    drop_table(table_name, options.merge(if_exists: true))
  end

  do_execute(schema_creation.accept(td), format: nil)
end

#database_engine_atomic?Boolean

Returns:

  • (Boolean)


393
394
395
396
397
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 393

def database_engine_atomic?
  current_database_engine = "select engine from system.databases where name = '#{@config[:database]}'"
  res = select_one(current_database_engine)
  res['engine'] == 'Atomic' if res
end

#drop_database(name) ⇒ Object

Drops a ClickHouse database.



323
324
325
326
327
328
329
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 323

def drop_database(name) #:nodoc:
  sql = apply_cluster "DROP DATABASE IF EXISTS #{quote_table_name(name)}"
  log_with_debug(sql, adapter_name) do
    res = @connection.post("/?#{@config.except(:database).to_param}", sql)
    process_response(res)
  end
end

#drop_table(table_name, options = {}) ⇒ Object

:nodoc:



335
336
337
338
339
340
341
342
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 335

def drop_table(table_name, options = {}) # :nodoc:
  do_execute apply_cluster "DROP TABLE#{' IF EXISTS' if options[:if_exists]} #{quote_table_name(table_name)}"

  if options[:with_distributed]
    distributed_table_name = options.delete(:with_distributed)
    drop_table(distributed_table_name, **options)
  end
end

#extract_limit(sql_type) ⇒ Object

:nodoc:



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 161

def extract_limit(sql_type) # :nodoc:
  case sql_type
    when /(Nullable)?\(?String\)?/
      super('String')
    when /(Nullable)?\(?U?Int8\)?/
      1
    when /(Nullable)?\(?U?Int16\)?/
      2
    when /(Nullable)?\(?U?Int32\)?/
      nil
    when /(Nullable)?\(?U?Int64\)?/
      8
    else
      super
  end
end

#extract_precision(sql_type) ⇒ Object



188
189
190
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 188

def extract_precision(sql_type)
  $1.to_i if sql_type =~ /\((\d+)(,\s?\d+)?\)/
end

#extract_scale(sql_type) ⇒ Object

‘extract_scale` and `extract_precision` are the same as in the Rails abstract base class, except this permits a space after the comma



181
182
183
184
185
186
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 181

def extract_scale(sql_type)
  case sql_type
  when /\((\d+)\)/ then 0
  when /\((\d+)(,\s?(\d+))\)/ then $3.to_i
  end
end

#initialize_type_map(m) ⇒ Object

:nodoc:



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 192

def initialize_type_map(m) # :nodoc:
  super
  register_class_with_limit m, %r(String), Type::String
  register_class_with_limit m, 'Date',  Clickhouse::OID::Date
  register_class_with_precision m, %r(datetime)i,  Clickhouse::OID::DateTime

  register_class_with_limit m, %r(Int8), Type::Integer
  register_class_with_limit m, %r(Int16), Type::Integer
  register_class_with_limit m, %r(Int32), Type::Integer
  register_class_with_limit m, %r(Int64), Type::Integer
  register_class_with_limit m, %r(Int128), Type::Integer
  register_class_with_limit m, %r(Int256), Type::Integer

  register_class_with_limit m, %r(UInt8), Type::UnsignedInteger
  register_class_with_limit m, %r(UInt16), Type::UnsignedInteger
  register_class_with_limit m, %r(UInt32), Type::UnsignedInteger
  register_class_with_limit m, %r(UInt64), Type::UnsignedInteger
  #register_class_with_limit m, %r(UInt128), Type::UnsignedInteger #not implemnted in clickhouse
  register_class_with_limit m, %r(UInt256), Type::UnsignedInteger
  # register_class_with_limit m, %r(Array), Clickhouse::OID::Array
  m.register_type(%r(Array)) do |sql_type|
    Clickhouse::OID::Array.new(sql_type)
  end
end

#migration_contextObject

:nodoc:



145
146
147
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 145

def migration_context # :nodoc:
  ClickhouseActiverecord::MigrationContext.new(migrations_paths, schema_migration)
end

#migrations_pathsObject



141
142
143
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 141

def migrations_paths
  @full_config[:migrations_paths] || 'db/migrate_clickhouse'
end

#native_database_typesObject

:nodoc:



153
154
155
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 153

def native_database_types #:nodoc:
  NATIVE_DATABASE_TYPES
end

#primary_key(table_name) ⇒ Object

SCHEMA STATEMENTS ========================================



261
262
263
264
265
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 261

def primary_key(table_name) #:nodoc:
  pk = table_structure(table_name).first
  return 'id' if pk.present? && pk[0] == 'id'
  false
end

#quoted_date(value) ⇒ Object

Quoting time without microseconds



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 227

def quoted_date(value)
  if value.acts_like?(:time)
    if ActiveRecord::version >= Gem::Version.new('7')
      zone_conversion_method = ActiveRecord.default_timezone == :utc ? :getutc : :getlocal
    else
      zone_conversion_method = ActiveRecord::Base.default_timezone == :utc ? :getutc : :getlocal
    end

    if value.respond_to?(zone_conversion_method)
      value = value.send(zone_conversion_method)
    end
  end

  if ActiveRecord::version >= Gem::Version.new('7')
    value.to_fs(:db)
  else
    value.to_s(:db)
  end
end

#remove_column(table_name, column_name, type = nil, **options) ⇒ Object



352
353
354
355
356
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 352

def remove_column(table_name, column_name, type = nil, **options)
  return if options[:if_exists] == true && !column_exists?(table_name, column_name)

  execute("ALTER TABLE #{quote_table_name(table_name)} #{remove_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1})
end

#rename_table(table_name, new_name) ⇒ Object



331
332
333
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 331

def rename_table(table_name, new_name)
  do_execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}"
end

#replicaObject



377
378
379
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 377

def replica
  @full_config[:replica_name]
end

#replica_path(table) ⇒ Object



389
390
391
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 389

def replica_path(table)
  "/clickhouse/tables/#{cluster}/#{@config[:database]}.#{table}"
end

#schema_migrationObject

Support SchemaMigration from v5.2.2 to v6+



137
138
139
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 137

def schema_migration # :nodoc:
  ClickhouseActiverecord::SchemaMigration
end

#show_create_table(table) ⇒ String

Parameters:

  • table (String)

Returns:

  • (String)


273
274
275
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 273

def show_create_table(table)
  do_system_execute("SHOW CREATE TABLE `#{table}`")['data'].try(:first).try(:first).gsub(/[\n\s]+/m, ' ')
end

#supports_insert_on_duplicate_skip?Boolean

Returns:

  • (Boolean)


409
410
411
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 409

def supports_insert_on_duplicate_skip?
  true
end

#supports_insert_on_duplicate_update?Boolean

Returns:

  • (Boolean)


413
414
415
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 413

def supports_insert_on_duplicate_update?
  true
end

#use_default_replicated_merge_tree_params?Boolean

Returns:

  • (Boolean)


381
382
383
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 381

def use_default_replicated_merge_tree_params?
  database_engine_atomic? && @full_config[:use_default_replicated_merge_tree_params]
end

#use_replica?Boolean

Returns:

  • (Boolean)


385
386
387
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 385

def use_replica?
  (replica || use_default_replicated_merge_tree_params?) && cluster
end

#valid_type?(type) ⇒ Boolean

Returns:

  • (Boolean)


157
158
159
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 157

def valid_type?(type)
  !native_database_types[type].nil?
end