Class: CassandraObject::Adapters::CassandraDriver::Client
- Inherits:
-
ActiveRecord::ConnectionAdapters::AbstractAdapter
- Object
- ActiveRecord::ConnectionAdapters::AbstractAdapter
- CassandraObject::Adapters::CassandraDriver::Client
- Defined in:
- lib/cassandra_object/adapters/cassandra_driver.rb
Overview
The client class acts like the old cassandra gem
Constant Summary collapse
- KEY_FIELD =
'key'- NAME_FIELD =
'column1'- VALUE_FIELD =
'value'
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Instance Method Summary collapse
- #add(column_family, key, by, fields, opts = nil) ⇒ Object
- #add_column_family(column_family) ⇒ Object
- #add_multiple_columns(column_family, key, hash, opts = nil) ⇒ Object
- #batch(opts = false) ⇒ Object
- #batch_mode? ⇒ Boolean
- #clause(column_family, val, fields, operator = '=') ⇒ Object
- #close ⇒ Object
- #column_clause(column_family, val, operator = '=') ⇒ Object
- #column_families ⇒ Object
- #column_string(row, fields) ⇒ Object
-
#convert_str_to_hex(str) ⇒ Object
insert a blob.
-
#convert_str_to_timeuuid(str) ⇒ Object
when the column names are timeuuid.
- #decode(val, type) ⇒ Object
- #escape(str, type) ⇒ Object
- #execute(query, options = {}) ⇒ Object
- #execute_async(query, options = {}) ⇒ Object
- #execute_options(opts) ⇒ Object
- #get(column_family, key, *columns_options) ⇒ Object
- #get_column_fields(column_family) ⇒ Object
- #get_columns(column_family, key, columns, opts) ⇒ Object
- #get_columns_as_hash(column_family, key, columns, opts) ⇒ Object
- #get_key_fields(column_family) ⇒ Object
- #get_parts(column_family, val, fields) ⇒ Object
- #get_range(column_family, opts = {}, &blk) ⇒ Object
- #get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts = {}) ⇒ Object
- #get_type(column_family, field) ⇒ Object
- #get_value(column_family, key, column, consistency) ⇒ Object
- #has_table?(name) ⇒ Boolean
-
#initialize(session, cluster) ⇒ Client
constructor
A new instance of Client.
- #insert(column_family, key, values, opts = nil) ⇒ Object
- #key_clause(column_family, val) ⇒ Object
- #key_type(column_family) ⇒ Object
- #keyspace ⇒ Object
- #multi_get(column_family, keys, *args) ⇒ Object
- #name_type(column_family) ⇒ Object
- #normalize_composite_key_part(val, type) ⇒ Object
- #remove(column_family, key, *args) ⇒ Object
- #reverse_comparator(column_family) ⇒ Object
- #schema(reload = false) ⇒ Object
- #schema_cache ⇒ Object
- #value_type(column_family) ⇒ Object
Constructor Details
#initialize(session, cluster) ⇒ Client
Returns a new instance of Client.
75 76 77 78 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 75 def initialize(session, cluster) @session = session @cluster = cluster end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
69 70 71 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 69 def cluster @cluster end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
69 70 71 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 69 def session @session end |
Instance Method Details
#add(column_family, key, by, fields, opts = nil) ⇒ Object
180 181 182 183 184 185 186 187 188 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 180 def add(column_family, key, by, fields, opts=nil) async = opts.try(:[], :async) fields = [fields] unless fields.is_a?(Array) fields.each do |field| query = "UPDATE \"#{column_family}\" SET #{VALUE_FIELD} = #{VALUE_FIELD} + #{by} WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, field)};" async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) end end |
#add_column_family(column_family) ⇒ Object
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 313 def add_column_family(column_family) value_type = column_family.default_validation_class == 'CounterColumnType' ? 'counter' : 'text' query = <<-CQL CREATE TABLE "#{column_family.name}" ( key blob, column1 text, value #{value_type}, PRIMARY KEY (key, column1) ) WITH COMPACT STORAGE AND CLUSTERING ORDER BY (column1 ASC) AND bloom_filter_fp_chance = 0.1 AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' AND comment = '' AND compaction = {'sstable_size_in_mb': '160', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.SnappyCompressor'} AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = 'NONE'; CQL self.execute(query) self.schema(true) end |
#add_multiple_columns(column_family, key, hash, opts = nil) ⇒ Object
190 191 192 193 194 195 196 197 198 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 190 def add_multiple_columns(column_family, key, hash, opts=nil) async = opts.try(:[], :async) fields = [fields] unless fields.is_a?(Array) hash.each do |field, by| query = "UPDATE \"#{column_family}\" SET #{VALUE_FIELD} = #{VALUE_FIELD} + #{by} WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, field)};" async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) end end |
#batch(opts = false) ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 285 def batch(opts=false) if @batched_queries yield else begin async = opts.try(:[], :async) @batched_queries ||= [] yield query = "BEGIN BATCH\n" query << @batched_queries.join("\n") query << "\nAPPLY BATCH;" async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) rescue Exception => e raise e ensure @batched_queries = nil end end end |
#batch_mode? ⇒ Boolean
281 282 283 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 281 def batch_mode? !! @batched_queries end |
#clause(column_family, val, fields, operator = '=') ⇒ Object
390 391 392 393 394 395 396 397 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 390 def clause(column_family, val, fields, operator='=') if operator == '=' get_parts(column_family, val, fields).map { |field, val, type| "#{field} #{operator} #{escape(val, type)}" }.join(' AND ') else field, val, type = get_parts(column_family, val, fields).first "#{field} #{operator} #{escape(val, type)}" end end |
#close ⇒ Object
80 81 82 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 80 def close session.close end |
#column_clause(column_family, val, operator = '=') ⇒ Object
386 387 388 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 386 def column_clause(column_family, val, operator='=') clause(column_family, val, get_column_fields(column_family), operator) end |
#column_families ⇒ Object
277 278 279 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 277 def column_families @column_families ||= self.cluster.keyspace(session.keyspace).tables.inject({}) { |hsh, table| hsh[table.name] = table; hsh } end |
#column_string(row, fields) ⇒ Object
444 445 446 447 448 449 450 451 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 444 def column_string(row, fields) if fields.size > 1 parts = fields.map { |f, t| decode(row[f], t) } Cassandra::Composite.new(*parts).to_s else row[NAME_FIELD] end end |
#convert_str_to_hex(str) ⇒ Object
insert a blob
440 441 442 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 440 def convert_str_to_hex(str) '0x' << str.unpack('H*').first end |
#convert_str_to_timeuuid(str) ⇒ Object
when the column names are timeuuid
435 436 437 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 435 def convert_str_to_timeuuid(str) SimpleUUID::UUID.new(str).to_guid end |
#decode(val, type) ⇒ Object
425 426 427 428 429 430 431 432 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 425 def decode(val, type) case type when :timeuuid SimpleUUID::UUID.new(val.to_s).to_s # binary version else val end end |
#escape(str, type) ⇒ Object
399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 399 def escape(str, type) case type when :timeuuid convert_str_to_timeuuid str when :blob convert_str_to_hex str when :int, :bigint str else "'#{str}'" end end |
#execute(query, options = {}) ⇒ Object
84 85 86 87 88 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 84 def execute(query, ={}) ActiveSupport::Notifications.instrument('query.cassandra', query: query, options: , async: false) do session.execute query, end end |
#execute_async(query, options = {}) ⇒ Object
90 91 92 93 94 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 90 def execute_async(query, ={}) ActiveSupport::Notifications.instrument('query.cassandra', query: query, options: , async: true) do session.execute_async query, end end |
#execute_options(opts) ⇒ Object
255 256 257 258 259 260 261 262 263 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 255 def (opts) opts.try(:slice, :consistency, :page_size, :trace, :timeout, :serial_consistency ) || {} end |
#get(column_family, key, *columns_options) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 121 def get(column_family, key, *) opts = .pop if .last.is_a?(Hash) async = opts.try(:[], :async) columns = .flatten.compact column_fields = get_column_fields(column_family) query = if columns.size == 1 "SELECT #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, columns.first)}" else "SELECT #{column_fields.map(&:first).join(', ')}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)}" end result = async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) return result if async if columns.size == 1 result.size > 0 ? result.first[VALUE_FIELD] : nil else data = result.inject({}) { |hsh, row| hsh[column_string(row, column_fields)] = row[VALUE_FIELD]; hsh } columns.size > 0 ? data.slice(*columns.map(&:to_s)) : data end end |
#get_column_fields(column_family) ⇒ Object
363 364 365 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 363 def get_column_fields(column_family) cluster.keyspace(keyspace).table(column_family).columns.select { |c| c.name =~ /^column/ }.map { |c| [c.name, c.type.kind] } end |
#get_columns(column_family, key, columns, opts) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 146 def get_columns(column_family, key, columns, opts) async = opts.try(:[], :async) name_fields = columns.map { |c| escape(c, name_type(column_family)) }.join(', ') query = "SELECT #{NAME_FIELD}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{NAME_FIELD} IN(#{name_fields}) AND #{key_clause(column_family, key)};" result = async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) return result if async data = result .inject({}) { |hsh, row| hsh[row[NAME_FIELD]] = row[VALUE_FIELD]; hsh } columns.map { |column| data[column.to_s] } end |
#get_columns_as_hash(column_family, key, columns, opts) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 162 def get_columns_as_hash(column_family, key, columns, opts) async = opts.try(:[], :async) name_fields = columns.map { |c| "'#{c}'" }.join(', ') query = "SELECT #{NAME_FIELD}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{NAME_FIELD} IN(#{name_fields}) AND #{key_clause(column_family, key)};" result = async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) return result if async result .inject({}) { |hsh, row| hsh[row[NAME_FIELD]] = row[VALUE_FIELD]; hsh } end |
#get_key_fields(column_family) ⇒ Object
359 360 361 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 359 def get_key_fields(column_family) cluster.keyspace(keyspace).table(column_family).columns.select { |c| c.name =~ /^key/ }.map { |c| [c.name, c.type.kind] } end |
#get_parts(column_family, val, fields) ⇒ Object
371 372 373 374 375 376 377 378 379 380 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 371 def get_parts(column_family, val, fields) val = if fields.size > 1 Cassandra::Composite.new_from_packed(val.dup).parts else [val] end fields.each_with_index.map { |(field, type), idx| [field, normalize_composite_key_part(val[idx], type), type] } end |
#get_range(column_family, opts = {}, &blk) ⇒ Object
214 215 216 217 218 219 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 214 def get_range(column_family, opts={}, &blk) key_count = opts[:key_count] || 100 query = "SELECT #{KEY_FIELD} FROM \"#{column_family}\" LIMIT #{key_count}" keys = self.execute(query, (opts)).map { |result| result[KEY_FIELD] } keys.size > 0 ? multi_get(column_family, keys) : {} end |
#get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts = {}) ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 233 def get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts={}) opts[:consistency] = consistency column_fields = get_column_fields(column_family) query = "SELECT #{column_fields.map(&:first).join(', ')}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)}" query << " AND #{column_clause(column_family, column)}" if column query << " AND #{column_clause(column_family, start, '>=')}" unless start.empty? query << " AND #{column_clause(column_family, finish, '<=')}" unless finish.empty? if reversed direction = reverse_comparator(column_family) ? 'ASC' : 'DESC' query << " ORDER BY " query << column_fields.map { |f, _| "#{f} #{direction}" }.join(', ') end query << " LIMIT #{count}" self.execute(query, (opts)).inject({}) do |results, row| results[column_string(row, column_fields)] = decode(row[VALUE_FIELD], value_type(column_family)) results end end |
#get_type(column_family, field) ⇒ Object
355 356 357 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 355 def get_type(column_family, field) cluster.keyspace(keyspace).table(column_family).column(field).type.kind end |
#get_value(column_family, key, column, consistency) ⇒ Object
176 177 178 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 176 def get_value(column_family, key, column, consistency) get column_family, key, [column], :consistency => consistency end |
#has_table?(name) ⇒ Boolean
269 270 271 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 269 def has_table?(name) self.cluster.keyspace(session.keyspace).has_table? name end |
#insert(column_family, key, values, opts = nil) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 96 def insert(column_family, key, values, opts=nil) ttl = opts.try(:[], :ttl) async = opts.try(:[], :async) = ttl ? " USING TTL #{ttl}" : '' key_fields = get_key_fields(column_family) column_fields = get_column_fields(column_family) key_parts = get_parts(column_family, key, key_fields) insert_query = values.map do |name, value| column_parts = get_parts(column_family, name, column_fields) " INSERT INTO \"#{column_family}\" (#{key_fields.map(&:first).join(', ')}, #{column_fields.map(&:first).join(', ')}, #{VALUE_FIELD}) VALUES (#{key_parts.map { |f, v| escape(v, get_type(column_family, f)) }.join(', ')}, #{column_parts.map { |f, v| escape(v, get_type(column_family, f)) }.join(', ')}, #{escape(value, value_type(column_family))})#{}" end.join("\n") if batch_mode? @batched_queries << insert_query else query = "BEGIN BATCH\n" query << insert_query query << "\nAPPLY BATCH;" async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) end end |
#key_clause(column_family, val) ⇒ Object
382 383 384 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 382 def key_clause(column_family, val) clause(column_family, val, get_key_fields(column_family)) end |
#key_type(column_family) ⇒ Object
343 344 345 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 343 def key_type(column_family) get_type(column_family, KEY_FIELD) end |
#keyspace ⇒ Object
265 266 267 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 265 def keyspace session.keyspace end |
#multi_get(column_family, keys, *args) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 221 def multi_get(column_family, keys, *args) opts = args.pop if args.last.is_a?(Hash) keys = keys.map { |key| escape(key, key_type(column_family)) }.join(',') results = {} query = "SELECT * FROM \"#{column_family}\" WHERE #{KEY_FIELD} IN(#{keys})" self.execute(query, (opts)).each do |row| results[row[KEY_FIELD]] ||= {} results[row[KEY_FIELD]][row[NAME_FIELD]] = row[VALUE_FIELD] end results end |
#name_type(column_family) ⇒ Object
347 348 349 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 347 def name_type(column_family) get_type(column_family, NAME_FIELD) end |
#normalize_composite_key_part(val, type) ⇒ Object
412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 412 def normalize_composite_key_part(val, type) case type when :timeuuid SimpleUUID::UUID.new(val).to_s when :int val.unpack('N').first when :bigint Cassandra::Long.new(val).to_i else val end end |
#remove(column_family, key, *args) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 200 def remove(column_family, key, *args) opts = args.pop if args.last.is_a?(Hash) async = opts.try(:[], :async) query = if args.first.nil? || args.first.is_a?(Hash) "DELETE FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)};" else "DELETE FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, args.first)};" end async ? self.execute_async(query, (opts)) : self.execute(query, (opts)) end |
#reverse_comparator(column_family) ⇒ Object
367 368 369 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 367 def reverse_comparator(column_family) self.cluster.keyspace(keyspace).table(column_family).send(:clustering_order).first == :desc end |
#schema(reload = false) ⇒ Object
305 306 307 308 309 310 311 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 305 def schema(reload=false) if reload remove_instance_variable(:@schema_cache) if instance_variable_defined?(:@schema_cache) remove_instance_variable(:@column_families) if instance_variable_defined?(:@column_families) self.cluster.refresh_schema end end |
#schema_cache ⇒ Object
273 274 275 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 273 def schema_cache @schema_cache ||= SchemaCache.new(self) end |
#value_type(column_family) ⇒ Object
351 352 353 |
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 351 def value_type(column_family) get_type(column_family, VALUE_FIELD) end |