Class: ActiveRecord::ConnectionAdapters::ClickhouseAdapter

Inherits:
AbstractAdapter
  • Object
show all
Includes:
ActiveRecord::ConnectionAdapters::Clickhouse::SchemaStatements
Defined in:
lib/active_record/connection_adapters/clickhouse_adapter.rb

Constant Summary collapse

ADAPTER_NAME =
'Clickhouse'.freeze
NATIVE_DATABASE_TYPES =
{
  string: { name: 'String' },
  integer: { name: 'UInt32' },
  big_integer: { name: 'UInt64' },
  float: { name: 'Float32' },
  decimal: { name: 'Decimal' },
  datetime: { name: 'DateTime' },
  datetime64: { name: 'DateTime64' },
  date: { name: 'Date' },
  boolean: { name: 'Bool' },
  uuid: { name: 'UUID' },

  enum8: { name: 'Enum8' },
  enum16: { name: 'Enum16' },

  int8:  { name: 'Int8' },
  int16: { name: 'Int16' },
  int32: { name: 'Int32' },
  int64:  { name: 'Int64' },
  int128: { name: 'Int128' },
  int256: { name: 'Int256' },

  uint8: { name: 'UInt8' },
  uint16: { name: 'UInt16' },
  uint32: { name: 'UInt32' },
  uint64: { name: 'UInt64' },
  # uint128: { name: 'UInt128' }, not yet implemented in clickhouse
  uint256: { name: 'UInt256' },
}.freeze

Constants included from ActiveRecord::ConnectionAdapters::Clickhouse::SchemaStatements

ActiveRecord::ConnectionAdapters::Clickhouse::SchemaStatements::DEFAULT_RESPONSE_FORMAT

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ActiveRecord::ConnectionAdapters::Clickhouse::SchemaStatements

#assume_migrated_upto_version, #data_sources, #do_execute, #do_system_execute, #exec_delete, #exec_insert, #exec_insert_all, #exec_update, #execute, #indexes, #internal_exec_query, #table_options, #tables, #with_yaml_fallback

Constructor Details

#initialize(logger, connection_parameters, config) ⇒ ClickhouseAdapter

Initializes and connects a Clickhouse adapter.



124
125
126
127
128
129
130
131
132
133
134
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 124

def initialize(logger, connection_parameters, config)
  super(nil, logger)
  @connection_parameters = connection_parameters
  @connection_config = { user: config[:username], password: config[:password], database: config[:database] }.compact
  @debug = config[:debug] || false
  @config = config

  @prepared_statements = false

  connect
end

Class Method Details

.extract_limit(sql_type) ⇒ Object

:nodoc:



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 153

def extract_limit(sql_type) # :nodoc:
  case sql_type
    when /(Nullable)?\(?String\)?/
      super('String')
    when /(Nullable)?\(?U?Int8\)?/
      1
    when /(Nullable)?\(?U?Int16\)?/
      2
    when /(Nullable)?\(?U?Int32\)?/
      nil
    when /(Nullable)?\(?U?Int64\)?/
      8
    else
      super
  end
end

.extract_precision(sql_type) ⇒ Object



180
181
182
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 180

def extract_precision(sql_type)
  $1.to_i if sql_type =~ /\((\d+)(,\s?\d+)?\)/
end

.extract_scale(sql_type) ⇒ Object

‘extract_scale` and `extract_precision` are the same as in the Rails abstract base class, except this permits a space after the comma



173
174
175
176
177
178
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 173

def extract_scale(sql_type)
  case sql_type
  when /\((\d+)\)/ then 0
  when /\((\d+)(,\s?(\d+))\)/ then $3.to_i
  end
end

.initialize_type_map(m) ⇒ Object

:nodoc:



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 184

def initialize_type_map(m) # :nodoc:
  super
  register_class_with_limit m, %r(String), Type::String
  register_class_with_limit m, 'Date',  Clickhouse::OID::Date
  register_class_with_precision m, %r(datetime)i,  Clickhouse::OID::DateTime

  register_class_with_limit m, %r(Int8), Type::Integer
  register_class_with_limit m, %r(Int16), Type::Integer
  register_class_with_limit m, %r(Int32), Type::Integer
  register_class_with_limit m, %r(Int64), Type::Integer
  register_class_with_limit m, %r(Int128), Type::Integer
  register_class_with_limit m, %r(Int256), Type::Integer

  register_class_with_limit m, %r(UInt8), Type::UnsignedInteger
  register_class_with_limit m, %r(UInt16), Type::UnsignedInteger
  register_class_with_limit m, %r(UInt32), Type::UnsignedInteger
  register_class_with_limit m, %r(UInt64), Type::UnsignedInteger
  #register_class_with_limit m, %r(UInt128), Type::UnsignedInteger #not implemnted in clickhouse
  register_class_with_limit m, %r(UInt256), Type::UnsignedInteger

  m.register_type %r(bool)i, ActiveModel::Type::Boolean.new
  m.register_type %r{uuid}i, Clickhouse::OID::Uuid.new
  # register_class_with_limit m, %r(Array), Clickhouse::OID::Array
  m.register_type(%r(Array)) do |sql_type|
    Clickhouse::OID::Array.new(sql_type)
  end
end

Instance Method Details

#add_column(table_name, column_name, type, **options) ⇒ Object



339
340
341
342
343
344
345
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 339

def add_column(table_name, column_name, type, **options)
  return if options[:if_not_exists] == true && column_exists?(table_name, column_name, type)

  at = create_alter_table table_name
  at.add_column(column_name, type, **options)
  execute(schema_creation.accept(at), nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1})
end

#apply_cluster(sql) ⇒ Object



394
395
396
397
398
399
400
401
402
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 394

def apply_cluster(sql)
  if cluster
    normalized_cluster_name = cluster.start_with?('{') ? "'#{cluster}'" : cluster

    "#{sql} ON CLUSTER #{normalized_cluster_name}"
  else
    sql
  end
end

#arel_visitorObject

:nodoc:



140
141
142
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 140

def arel_visitor # :nodoc:
  Arel::Visitors::Clickhouse.new(self)
end

#build_insert_sql(insert) ⇒ Object

:nodoc:



412
413
414
415
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 412

def build_insert_sql(insert) # :nodoc:
  sql = +"INSERT #{insert.into} #{insert.values_list}"
  sql
end

#change_column(table_name, column_name, type, options = {}) ⇒ Object



353
354
355
356
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 353

def change_column(table_name, column_name, type, options = {})
  result = do_execute("ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1})
  raise "Error parse json response: #{result}" if result.presence && !result.is_a?(Hash)
end

#change_column_default(table_name, column_name, default) ⇒ Object



364
365
366
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 364

def change_column_default(table_name, column_name, default)
  change_column table_name, column_name, nil, {default: default}.compact
end

#change_column_null(table_name, column_name, null, default = nil) ⇒ Object



358
359
360
361
362
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 358

def change_column_null(table_name, column_name, null, default = nil)
  structure = table_structure(table_name).select{|v| v[0] == column_name.to_s}.first
  raise "Column #{column_name} not found in table #{table_name}" if structure.nil?
  change_column table_name, column_name, structure[1].gsub(/(Nullable\()?(.*?)\)?/, '\2'), {null: null, default: default}.compact
end

#clusterObject



368
369
370
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 368

def cluster
  @config[:cluster_name]
end

#column_name_for_operation(operation, node) ⇒ Object

:nodoc:



240
241
242
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 240

def column_name_for_operation(operation, node) # :nodoc:
  visitor.compile(node)
end

#create_database(name) ⇒ Object

Create a new ClickHouse database.



267
268
269
270
271
272
273
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 267

def create_database(name)
  sql = apply_cluster "CREATE DATABASE #{quote_table_name(name)}"
  log_with_debug(sql, adapter_name) do
    res = @connection.post("/?#{@connection_config.except(:database).to_param}", sql)
    process_response(res, DEFAULT_RESPONSE_FORMAT)
  end
end

#create_schema_dumper(options) ⇒ Object

:nodoc:



256
257
258
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 256

def create_schema_dumper(options) # :nodoc:
  ClickhouseActiverecord::SchemaDumper.create(self, options)
end

#create_table(table_name, **options, &block) ⇒ Object



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 288

def create_table(table_name, **options, &block)
  options = apply_replica(table_name, options)
  td = create_table_definition(apply_cluster(table_name), **options)
  block.call td if block_given?
  td.column(:id, options[:id], null: false) if options[:id].present? && td[:id].blank?

  if options[:force]
    drop_table(table_name, options.merge(if_exists: true))
  end

  do_execute(schema_creation.accept(td), format: nil)

  if options[:with_distributed]
    distributed_table_name = options.delete(:with_distributed)
    sharding_key = options.delete(:sharding_key) || 'rand()'
    raise 'Set a cluster' unless cluster

    distributed_options =
      "Distributed(#{cluster}, #{@connection_config[:database]}, #{table_name}, #{sharding_key})"
    create_table(distributed_table_name, **options.merge(options: distributed_options), &block)
  end
end

#create_view(table_name, **options) {|td| ... } ⇒ Object

Yields:

  • (td)


275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 275

def create_view(table_name, **options)
  options.merge!(view: true)
  options = apply_replica(table_name, options)
  td = create_table_definition(apply_cluster(table_name), **options)
  yield td if block_given?

  if options[:force]
    drop_table(table_name, options.merge(if_exists: true))
  end

  do_execute(schema_creation.accept(td), format: nil)
end

#database_engine_atomic?Boolean

Returns:

  • (Boolean)


388
389
390
391
392
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 388

def database_engine_atomic?
  current_database_engine = "select engine from system.databases where name = '#{@connection_config[:database]}'"
  res = select_one(current_database_engine)
  res['engine'] == 'Atomic' if res
end

#drop_database(name) ⇒ Object

Drops a ClickHouse database.



312
313
314
315
316
317
318
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 312

def drop_database(name) #:nodoc:
  sql = apply_cluster "DROP DATABASE IF EXISTS #{quote_table_name(name)}"
  log_with_debug(sql, adapter_name) do
    res = @connection.post("/?#{@connection_config.except(:database).to_param}", sql)
    process_response(res, DEFAULT_RESPONSE_FORMAT)
  end
end

#drop_table(table_name, options = {}) ⇒ Object

:nodoc:



324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 324

def drop_table(table_name, options = {}) # :nodoc:
  query = "DROP TABLE"
  query = "#{query} IF EXISTS " if options[:if_exists]
  query = "#{query} #{quote_table_name(table_name)}"
  query = apply_cluster(query)
  query = "#{query} SYNC" if options[:sync]

  do_execute(query)

  if options[:with_distributed]
    distributed_table_name = options.delete(:with_distributed)
    drop_table(distributed_table_name, **options)
  end
end

#migrations_pathsObject



136
137
138
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 136

def migrations_paths
  @config[:migrations_paths] || 'db/migrate_clickhouse'
end

#native_database_typesObject

:nodoc:



144
145
146
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 144

def native_database_types #:nodoc:
  NATIVE_DATABASE_TYPES
end

#primary_key(table_name) ⇒ Object

SCHEMA STATEMENTS ========================================



250
251
252
253
254
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 250

def primary_key(table_name) #:nodoc:
  pk = table_structure(table_name).first
  return 'id' if pk.present? && pk[0] == 'id'
  false
end

#quote(value) ⇒ Object



218
219
220
221
222
223
224
225
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 218

def quote(value)
  case value
  when Array
    '[' + value.map { |v| quote(v) }.join(', ') + ']'
  else
    super
  end
end

#quoted_date(value) ⇒ Object

Quoting time without microseconds



228
229
230
231
232
233
234
235
236
237
238
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 228

def quoted_date(value)
  if value.acts_like?(:time)
    zone_conversion_method = ActiveRecord.default_timezone == :utc ? :getutc : :getlocal

    if value.respond_to?(zone_conversion_method)
      value = value.send(zone_conversion_method)
    end
  end

  value.to_fs(:db)
end

#remove_column(table_name, column_name, type = nil, **options) ⇒ Object



347
348
349
350
351
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 347

def remove_column(table_name, column_name, type = nil, **options)
  return if options[:if_exists] == true && !column_exists?(table_name, column_name)

  execute("ALTER TABLE #{quote_table_name(table_name)} #{remove_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1})
end

#rename_table(table_name, new_name) ⇒ Object



320
321
322
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 320

def rename_table(table_name, new_name)
  do_execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}"
end

#replicaObject



372
373
374
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 372

def replica
  @config[:replica_name]
end

#replica_path(table) ⇒ Object



384
385
386
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 384

def replica_path(table)
  "/clickhouse/tables/#{cluster}/#{@connection_config[:database]}.#{table}"
end

#show_create_table(table) ⇒ String

Parameters:

  • table (String)

Returns:

  • (String)


262
263
264
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 262

def show_create_table(table)
  do_system_execute("SHOW CREATE TABLE `#{table}`")['data'].try(:first).try(:first).gsub(/[\n\s]+/m, ' ')
end

#supports_insert_on_duplicate_skip?Boolean

Returns:

  • (Boolean)


404
405
406
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 404

def supports_insert_on_duplicate_skip?
  true
end

#supports_insert_on_duplicate_update?Boolean

Returns:

  • (Boolean)


408
409
410
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 408

def supports_insert_on_duplicate_update?
  true
end

#type_mapObject

In Rails 7 used constant TYPE_MAP, we need redefine method



214
215
216
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 214

def type_map
  @type_map ||= Type::TypeMap.new.tap { |m| ClickhouseAdapter.initialize_type_map(m) }
end

#use_default_replicated_merge_tree_params?Boolean

Returns:

  • (Boolean)


376
377
378
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 376

def use_default_replicated_merge_tree_params?
  database_engine_atomic? && @config[:use_default_replicated_merge_tree_params]
end

#use_replica?Boolean

Returns:

  • (Boolean)


380
381
382
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 380

def use_replica?
  (replica || use_default_replicated_merge_tree_params?) && cluster
end

#valid_type?(type) ⇒ Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 148

def valid_type?(type)
  !native_database_types[type].nil?
end