Class: ActiveRecord::ConnectionAdapters::ClickhouseAdapter
- Inherits:
-
AbstractAdapter
- Object
- AbstractAdapter
- ActiveRecord::ConnectionAdapters::ClickhouseAdapter
- Defined in:
- lib/active_record/connection_adapters/clickhouse_adapter.rb
Constant Summary collapse
- ADAPTER_NAME =
'Clickhouse'.freeze
- NATIVE_DATABASE_TYPES =
{ string: { name: 'String' }, integer: { name: 'UInt32' }, big_integer: { name: 'UInt64' }, float: { name: 'Float32' }, decimal: { name: 'Decimal' }, datetime: { name: 'DateTime' }, datetime64: { name: 'DateTime64' }, date: { name: 'Date' }, boolean: { name: 'Bool' }, uuid: { name: 'UUID' }, enum8: { name: 'Enum8' }, enum16: { name: 'Enum16' }, int8: { name: 'Int8' }, int16: { name: 'Int16' }, int32: { name: 'Int32' }, int64: { name: 'Int64' }, int128: { name: 'Int128' }, int256: { name: 'Int256' }, uint8: { name: 'UInt8' }, uint16: { name: 'UInt16' }, uint32: { name: 'UInt32' }, uint64: { name: 'UInt64' }, # uint128: { name: 'UInt128' }, not yet implemented in clickhouse uint256: { name: 'UInt256' }, }.freeze
Instance Method Summary collapse
- #_quote(value) ⇒ Object
- #add_column(table_name, column_name, type, **options) ⇒ Object
- #apply_cluster(sql) ⇒ Object
-
#arel_visitor ⇒ Object
:nodoc:.
-
#build_insert_sql(insert) ⇒ Object
:nodoc:.
- #change_column(table_name, column_name, type, options = {}) ⇒ Object
- #change_column_default(table_name, column_name, default) ⇒ Object
- #change_column_null(table_name, column_name, null, default = nil) ⇒ Object
- #cluster ⇒ Object
-
#column_name_for_operation(operation, node) ⇒ Object
:nodoc:.
-
#create_database(name) ⇒ Object
Create a new ClickHouse database.
-
#create_schema_dumper(options) ⇒ Object
:nodoc:.
- #create_table(table_name, **options, &block) ⇒ Object
- #create_view(table_name, **options) {|td| ... } ⇒ Object
- #database_engine_atomic? ⇒ Boolean
-
#drop_database(name) ⇒ Object
Drops a ClickHouse database.
-
#drop_table(table_name, options = {}) ⇒ Object
:nodoc:.
-
#extract_limit(sql_type) ⇒ Object
:nodoc:.
- #extract_precision(sql_type) ⇒ Object
-
#extract_scale(sql_type) ⇒ Object
‘extract_scale` and `extract_precision` are the same as in the Rails abstract base class, except this permits a space after the comma.
-
#initialize(logger, connection_parameters, config, full_config) ⇒ ClickhouseAdapter
constructor
Initializes and connects a Clickhouse adapter.
-
#initialize_type_map(m) ⇒ Object
:nodoc:.
-
#migration_context ⇒ Object
:nodoc:.
- #migrations_paths ⇒ Object
-
#native_database_types ⇒ Object
:nodoc:.
-
#primary_key(table_name) ⇒ Object
SCHEMA STATEMENTS ========================================.
-
#quoted_date(value) ⇒ Object
Quoting time without microseconds.
- #remove_column(table_name, column_name, type = nil, **options) ⇒ Object
- #rename_table(table_name, new_name) ⇒ Object
- #replica ⇒ Object
- #replica_path(table) ⇒ Object
-
#schema_migration ⇒ Object
Support SchemaMigration from v5.2.2 to v6+.
- #show_create_table(table) ⇒ String
- #supports_insert_on_duplicate_skip? ⇒ Boolean
- #supports_insert_on_duplicate_update? ⇒ Boolean
- #use_default_replicated_merge_tree_params? ⇒ Boolean
- #use_replica? ⇒ Boolean
- #valid_type?(type) ⇒ Boolean
Methods included from ActiveRecord::ConnectionAdapters::Clickhouse::SchemaStatements
#assume_migrated_upto_version, #data_sources, #do_execute, #do_system_execute, #exec_delete, #exec_insert, #exec_insert_all, #exec_query, #exec_update, #execute, #indexes, #table_options, #tables
Constructor Details
#initialize(logger, connection_parameters, config, full_config) ⇒ ClickhouseAdapter
Initializes and connects a Clickhouse adapter.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 121 def initialize(logger, connection_parameters, config, full_config) super(nil, logger) @connection_parameters = connection_parameters @config = config @debug = full_config[:debug] || false @full_config = full_config @prepared_statements = false if ActiveRecord::version == Gem::Version.new('6.0.0') @prepared_statement_status = Concurrent::ThreadLocalVar.new(false) end connect end |
Instance Method Details
#_quote(value) ⇒ Object
217 218 219 220 221 222 223 224 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 217 def _quote(value) case value when Array '[' + value.map { |v| _quote(v) }.join(', ') + ']' else super end end |
#add_column(table_name, column_name, type, **options) ⇒ Object
344 345 346 347 348 349 350 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 344 def add_column(table_name, column_name, type, **) return if [:if_not_exists] == true && column_exists?(table_name, column_name, type) at = create_alter_table table_name at.add_column(column_name, type, **) execute(schema_creation.accept(at), nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) end |
#apply_cluster(sql) ⇒ Object
399 400 401 402 403 404 405 406 407 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 399 def apply_cluster(sql) if cluster normalized_cluster_name = cluster.start_with?('{') ? "'#{cluster}'" : cluster "#{sql} ON CLUSTER #{normalized_cluster_name}" else sql end end |
#arel_visitor ⇒ Object
:nodoc:
149 150 151 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 149 def arel_visitor # :nodoc: Arel::Visitors::Clickhouse.new(self) end |
#build_insert_sql(insert) ⇒ Object
:nodoc:
417 418 419 420 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 417 def build_insert_sql(insert) # :nodoc: sql = +"INSERT #{insert.into} #{insert.values_list}" sql end |
#change_column(table_name, column_name, type, options = {}) ⇒ Object
358 359 360 361 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 358 def change_column(table_name, column_name, type, = {}) result = do_execute("ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, )}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) raise "Error parse json response: #{result}" if result.presence && !result.is_a?(Hash) end |
#change_column_default(table_name, column_name, default) ⇒ Object
369 370 371 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 369 def change_column_default(table_name, column_name, default) change_column table_name, column_name, nil, {default: default}.compact end |
#change_column_null(table_name, column_name, null, default = nil) ⇒ Object
363 364 365 366 367 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 363 def change_column_null(table_name, column_name, null, default = nil) structure = table_structure(table_name).select{|v| v[0] == column_name.to_s}.first raise "Column #{column_name} not found in table #{table_name}" if structure.nil? change_column table_name, column_name, structure[1].gsub(/(Nullable\()?(.*?)\)?/, '\2'), {null: null, default: default}.compact end |
#cluster ⇒ Object
373 374 375 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 373 def cluster @full_config[:cluster_name] end |
#column_name_for_operation(operation, node) ⇒ Object
:nodoc:
247 248 249 250 251 252 253 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 247 def column_name_for_operation(operation, node) # :nodoc: if ActiveRecord::version >= Gem::Version.new('6') visitor.compile(node) else column_name_from_arel_node(node) end end |
#create_database(name) ⇒ Object
Create a new ClickHouse database.
278 279 280 281 282 283 284 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 278 def create_database(name) sql = apply_cluster "CREATE DATABASE #{quote_table_name(name)}" log_with_debug(sql, adapter_name) do res = @connection.post("/?#{@config.except(:database).to_param}", sql) process_response(res) end end |
#create_schema_dumper(options) ⇒ Object
:nodoc:
267 268 269 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 267 def create_schema_dumper() # :nodoc: ClickhouseActiverecord::SchemaDumper.create(self, ) end |
#create_table(table_name, **options, &block) ⇒ Object
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 299 def create_table(table_name, **, &block) = apply_replica(table_name, ) td = create_table_definition(apply_cluster(table_name), **) block.call td if block_given? td.column(:id, [:id], null: false) if [:id].present? && td[:id].blank? if [:force] drop_table(table_name, .merge(if_exists: true)) end do_execute(schema_creation.accept(td), format: nil) if [:with_distributed] distributed_table_name = .delete(:with_distributed) sharding_key = .delete(:sharding_key) || 'rand()' raise 'Set a cluster' unless cluster = "Distributed(#{cluster}, #{@config[:database]}, #{table_name}, #{sharding_key})" create_table(distributed_table_name, **.merge(options: ), &block) end end |
#create_view(table_name, **options) {|td| ... } ⇒ Object
286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 286 def create_view(table_name, **) .merge!(view: true) = apply_replica(table_name, ) td = create_table_definition(apply_cluster(table_name), **) yield td if block_given? if [:force] drop_table(table_name, .merge(if_exists: true)) end do_execute(schema_creation.accept(td), format: nil) end |
#database_engine_atomic? ⇒ Boolean
393 394 395 396 397 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 393 def database_engine_atomic? current_database_engine = "select engine from system.databases where name = '#{@config[:database]}'" res = select_one(current_database_engine) res['engine'] == 'Atomic' if res end |
#drop_database(name) ⇒ Object
Drops a ClickHouse database.
323 324 325 326 327 328 329 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 323 def drop_database(name) #:nodoc: sql = apply_cluster "DROP DATABASE IF EXISTS #{quote_table_name(name)}" log_with_debug(sql, adapter_name) do res = @connection.post("/?#{@config.except(:database).to_param}", sql) process_response(res) end end |
#drop_table(table_name, options = {}) ⇒ Object
:nodoc:
335 336 337 338 339 340 341 342 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 335 def drop_table(table_name, = {}) # :nodoc: do_execute apply_cluster "DROP TABLE#{' IF EXISTS' if [:if_exists]} #{quote_table_name(table_name)}" if [:with_distributed] distributed_table_name = .delete(:with_distributed) drop_table(distributed_table_name, **) end end |
#extract_limit(sql_type) ⇒ Object
:nodoc:
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 161 def extract_limit(sql_type) # :nodoc: case sql_type when /(Nullable)?\(?String\)?/ super('String') when /(Nullable)?\(?U?Int8\)?/ 1 when /(Nullable)?\(?U?Int16\)?/ 2 when /(Nullable)?\(?U?Int32\)?/ nil when /(Nullable)?\(?U?Int64\)?/ 8 else super end end |
#extract_precision(sql_type) ⇒ Object
188 189 190 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 188 def extract_precision(sql_type) $1.to_i if sql_type =~ /\((\d+)(,\s?\d+)?\)/ end |
#extract_scale(sql_type) ⇒ Object
‘extract_scale` and `extract_precision` are the same as in the Rails abstract base class, except this permits a space after the comma
181 182 183 184 185 186 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 181 def extract_scale(sql_type) case sql_type when /\((\d+)\)/ then 0 when /\((\d+)(,\s?(\d+))\)/ then $3.to_i end end |
#initialize_type_map(m) ⇒ Object
:nodoc:
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 192 def initialize_type_map(m) # :nodoc: super register_class_with_limit m, %r(String), Type::String register_class_with_limit m, 'Date', Clickhouse::OID::Date register_class_with_precision m, %r(datetime)i, Clickhouse::OID::DateTime register_class_with_limit m, %r(Int8), Type::Integer register_class_with_limit m, %r(Int16), Type::Integer register_class_with_limit m, %r(Int32), Type::Integer register_class_with_limit m, %r(Int64), Type::Integer register_class_with_limit m, %r(Int128), Type::Integer register_class_with_limit m, %r(Int256), Type::Integer register_class_with_limit m, %r(UInt8), Type::UnsignedInteger register_class_with_limit m, %r(UInt16), Type::UnsignedInteger register_class_with_limit m, %r(UInt32), Type::UnsignedInteger register_class_with_limit m, %r(UInt64), Type::UnsignedInteger #register_class_with_limit m, %r(UInt128), Type::UnsignedInteger #not implemnted in clickhouse register_class_with_limit m, %r(UInt256), Type::UnsignedInteger # register_class_with_limit m, %r(Array), Clickhouse::OID::Array m.register_type(%r(Array)) do |sql_type| Clickhouse::OID::Array.new(sql_type) end end |
#migration_context ⇒ Object
:nodoc:
145 146 147 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 145 def migration_context # :nodoc: ClickhouseActiverecord::MigrationContext.new(migrations_paths, schema_migration) end |
#migrations_paths ⇒ Object
141 142 143 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 141 def migrations_paths @full_config[:migrations_paths] || 'db/migrate_clickhouse' end |
#native_database_types ⇒ Object
:nodoc:
153 154 155 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 153 def native_database_types #:nodoc: NATIVE_DATABASE_TYPES end |
#primary_key(table_name) ⇒ Object
SCHEMA STATEMENTS ========================================
261 262 263 264 265 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 261 def primary_key(table_name) #:nodoc: pk = table_structure(table_name).first return 'id' if pk.present? && pk[0] == 'id' false end |
#quoted_date(value) ⇒ Object
Quoting time without microseconds
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 227 def quoted_date(value) if value.acts_like?(:time) if ActiveRecord::version >= Gem::Version.new('7') zone_conversion_method = ActiveRecord.default_timezone == :utc ? :getutc : :getlocal else zone_conversion_method = ActiveRecord::Base.default_timezone == :utc ? :getutc : :getlocal end if value.respond_to?(zone_conversion_method) value = value.send(zone_conversion_method) end end if ActiveRecord::version >= Gem::Version.new('7') value.to_fs(:db) else value.to_s(:db) end end |
#remove_column(table_name, column_name, type = nil, **options) ⇒ Object
352 353 354 355 356 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 352 def remove_column(table_name, column_name, type = nil, **) return if [:if_exists] == true && !column_exists?(table_name, column_name) execute("ALTER TABLE #{quote_table_name(table_name)} #{remove_column_for_alter(table_name, column_name, type, **)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) end |
#rename_table(table_name, new_name) ⇒ Object
331 332 333 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 331 def rename_table(table_name, new_name) do_execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}" end |
#replica ⇒ Object
377 378 379 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 377 def replica @full_config[:replica_name] end |
#replica_path(table) ⇒ Object
389 390 391 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 389 def replica_path(table) "/clickhouse/tables/#{cluster}/#{@config[:database]}.#{table}" end |
#schema_migration ⇒ Object
Support SchemaMigration from v5.2.2 to v6+
137 138 139 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 137 def schema_migration # :nodoc: ClickhouseActiverecord::SchemaMigration end |
#show_create_table(table) ⇒ String
273 274 275 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 273 def show_create_table(table) do_system_execute("SHOW CREATE TABLE `#{table}`")['data'].try(:first).try(:first).gsub(/[\n\s]+/m, ' ') end |
#supports_insert_on_duplicate_skip? ⇒ Boolean
409 410 411 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 409 def supports_insert_on_duplicate_skip? true end |
#supports_insert_on_duplicate_update? ⇒ Boolean
413 414 415 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 413 def supports_insert_on_duplicate_update? true end |
#use_default_replicated_merge_tree_params? ⇒ Boolean
381 382 383 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 381 def use_default_replicated_merge_tree_params? database_engine_atomic? && @full_config[:use_default_replicated_merge_tree_params] end |
#use_replica? ⇒ Boolean
385 386 387 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 385 def use_replica? (replica || use_default_replicated_merge_tree_params?) && cluster end |
#valid_type?(type) ⇒ Boolean
157 158 159 |
# File 'lib/active_record/connection_adapters/clickhouse_adapter.rb', line 157 def valid_type?(type) !native_database_types[type].nil? end |