Class: RightSupport::DB::CassandraModel
- Includes:
- Log::Mixin
- Defined in:
- lib/right_support/db/cassandra_model.rb
Overview
Base class for a column family in a keyspace Used to access data persisted in Cassandra Provides wrappers for Cassandra client methods
Constant Summary collapse
- DEFAULT_TIMEOUT =
Default timeout for client connection to Cassandra server
10- DEFAULT_COUNT =
Default maximum number of rows to retrieve in one chunk
100- METHODS_TO_LOG =
[:multi_get, :get, :get_indexed_slices, :get_columns, :insert, :remove, 'multi_get', 'get', 'get_indexed_slices', 'get_columns', 'insert', 'remove']
- @@current_keyspace =
nil- @@connections =
{}
Constants included from Log::Mixin
Class Attribute Summary collapse
-
.column_family ⇒ Object
Returns the value of attribute column_family.
-
.default_keyspace ⇒ Object
readonly
Returns the value of attribute default_keyspace.
Instance Attribute Summary collapse
-
#attributes ⇒ Object
self.
-
#key ⇒ Object
self.
Class Method Summary collapse
-
.all(k, opt = {}) ⇒ Object
Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved.
-
.batch(*args, &block) ⇒ Object
Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands.
-
.calculate_random_partitioner_token(key) ⇒ Integer
Compute the token for a given row key, which can provide information on the progress of very large “each” operations, e.g.
- .config ⇒ Object
- .config=(value) ⇒ Object
-
.conn ⇒ Object
Client connected to Cassandra server Create connection if does not already exist Use BinaryProtocolAccelerated if it available.
-
.do_op(meth, *args, &block) ⇒ Object
Perform a Cassandra operation on the connection object.
- .do_op_log(first_started_at, started_at, retries, meth, cf, key) ⇒ Object
- .env_config ⇒ Object
-
.get(key, opt = {}) ⇒ Object
Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved.
-
.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key.
-
.get_columns(key, columns, opt = {}) ⇒ Object
Get specific columns in row with specified key.
-
.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key.
-
.inherited(base) ⇒ Object
Deprecate usage of CassandraModel under Ruby < 1.9.
-
.insert(key, values, opt = {}) ⇒ Object
Insert a row for a key.
-
.keyspace ⇒ Object
Return current active keyspace.
-
.keyspace=(kyspc) ⇒ Object
Sets the default keyspace.
-
.keyspaces ⇒ Object
Return current keyspace names as Array of String (any keyspace that has been used this session).
- .namespace(env) ⇒ Object
-
.real_get(k, opt = {}) ⇒ Object
Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved.
-
.reconnect ⇒ Object
Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available.
-
.remove(*args) ⇒ Object
Delete row or columns of row.
-
.ring ⇒ Object
Cassandra ring.
-
.stream_all_indexed_slices(index, key) {|Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>| ... } ⇒ Object
This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows This method returns only columns that are within the result set specified by a secondary index equality query This method will iterate through chunks of rows of the resultset and it will yield to the caller all of the columns in chunks of 1,000 until all of the columns have been retrieved.
-
.with_keyspace(keyspace, append_env = true, &block) ⇒ Object
Temporarily change the working keyspace for this class for the duration of the block.
Instance Method Summary collapse
-
#[](key) ⇒ Object
Column value.
-
#[]=(key, value) ⇒ Object
Store new column value.
-
#destroy ⇒ Object
Delete object from Cassandra.
-
#initialize(key, attrs = {}) ⇒ CassandraModel
constructor
Create column family object.
-
#reload ⇒ Object
Load object from Cassandra without modifying this object.
-
#reload! ⇒ Object
Reload object value from Cassandra and update this object.
-
#save ⇒ Object
Store object in Cassandra.
Methods included from Log::Mixin
default_logger, default_logger=, included
Constructor Details
#initialize(key, attrs = {}) ⇒ CassandraModel
Create column family object
Parameters
- key(String)
-
Primary key for object
- attrs(Hash)
-
Attributes for object which form Cassandra row
with column name as key and column value as value
636 637 638 639 |
# File 'lib/right_support/db/cassandra_model.rb', line 636 def initialize(key, attrs = {}) self.key = key self.attributes = attrs end |
Class Attribute Details
.column_family ⇒ Object
Returns the value of attribute column_family.
119 120 121 |
# File 'lib/right_support/db/cassandra_model.rb', line 119 def column_family @column_family end |
.default_keyspace ⇒ Object (readonly)
Returns the value of attribute default_keyspace.
118 119 120 |
# File 'lib/right_support/db/cassandra_model.rb', line 118 def default_keyspace @default_keyspace end |
Instance Attribute Details
#attributes ⇒ Object
self
628 629 630 |
# File 'lib/right_support/db/cassandra_model.rb', line 628 def attributes @attributes end |
#key ⇒ Object
self
628 629 630 |
# File 'lib/right_support/db/cassandra_model.rb', line 628 def key @key end |
Class Method Details
.all(k, opt = {}) ⇒ Object
Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved
Parameters
- k(String|Array)
-
Individual primary key or list of keys on which to match
- opt(Hash)
-
Request options including :consistency and for column level
control :count, :start, :finish, :reversed
Return
- (Object|nil)
-
Individual row, or nil if not found, or ordered hash of rows
267 268 269 |
# File 'lib/right_support/db/cassandra_model.rb', line 267 def all(k, opt = {}) real_get(k, opt) end |
.batch(*args, &block) ⇒ Object
Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands
Parameters
- args(Array)
-
Batch options such as :consistency
Block
Required block making Cassandra requests
Returns
- (Array)
-
Mutation map and consistency level
Raise
- Exception
-
If block not specified
517 518 519 520 |
# File 'lib/right_support/db/cassandra_model.rb', line 517 def batch(*args, &block) raise "Block required!" unless block_given? do_op(:batch, *args, &block) end |
.calculate_random_partitioner_token(key) ⇒ Integer
Compute the token for a given row key, which can provide information on the progress of very large “each” operations, e.g. iterating over all rows of a column family.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/right_support/db/cassandra_model.rb', line 152 def calculate_random_partitioner_token(key) number = Digest::MD5.hexdigest(key).to_i(16) if number >= (2**127) # perform two's complement, basically this takes the absolute value of the number as # if it were a 128-bit signed number. Equivalent to Java BigInteger.abs() operation. result = (number ^ (2**128)-1) + 1 else # we're good result = number end result end |
.config ⇒ Object
132 133 134 |
# File 'lib/right_support/db/cassandra_model.rb', line 132 def config @@config end |
.config=(value) ⇒ Object
143 144 145 |
# File 'lib/right_support/db/cassandra_model.rb', line 143 def config=(value) @@config = normalize_config(value) unless value.nil? end |
.conn ⇒ Object
Client connected to Cassandra server Create connection if does not already exist Use BinaryProtocolAccelerated if it available
Return
- (Cassandra)
-
Client connected to server
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/right_support/db/cassandra_model.rb', line 238 def conn() @@connections ||= {} config = env_config = { :timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT, :server_retry_period => nil, } if defined? Thrift::BinaryProtocolAccelerated .merge!({:protocol => Thrift::BinaryProtocolAccelerated}) end @@connections[self.keyspace] ||= Cassandra.new(self.keyspace, config["server"], ) @@connections[self.keyspace].disable_node_auto_discovery! @@connections[self.keyspace] end |
.do_op(meth, *args, &block) ⇒ Object
Perform a Cassandra operation on the connection object. Rescue IOError by automatically reconnecting and retrying the operation.
Parameters
- meth(Symbol)
-
Method to be executed
- *args(Array)
-
Method arguments to forward to the Cassandra connection
Block
Block if any to be executed by method
Return
- (Object)
-
Value returned by executed method
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/right_support/db/cassandra_model.rb', line 534 def do_op(meth, *args, &block) first_started_at ||= Time.now retries ||= 0 started_at = Time.now # cassandra functionality result = conn.send(meth, *args, &block) # log functionality do_op_log(first_started_at, started_at, retries, meth, args[0], args[1]) return result rescue IOError reconnect retries += 1 retry end |
.do_op_log(first_started_at, started_at, retries, meth, cf, key) ⇒ Object
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 |
# File 'lib/right_support/db/cassandra_model.rb', line 552 def do_op_log(first_started_at, started_at, retries, meth, cf, key) now = Time.now attempt_time = now - started_at if METHODS_TO_LOG.include?(meth) key_count = key.is_a?(Array) ? key.size : 1 log_string = sprintf("CassandraModel %s, cf=%s, keys=%d, time=%.1fms", meth, cf, key_count, attempt_time*1000) if retries && retries > 0 total_time = now - first_started_at log_string += sprintf(", retries=%d, total_time=%.1fms", retries, total_time*1000) end logger.debug(log_string) end end |
.env_config ⇒ Object
136 137 138 139 140 141 |
# File 'lib/right_support/db/cassandra_model.rb', line 136 def env_config env = ENV['RACK_ENV'] raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" \ unless !@@config.nil? && @@config.keys.include?(env) && @@config[env] @@config[env] end |
.get(key, opt = {}) ⇒ Object
Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved
Parameters
- key(String)
-
Primary key on which to match
- opt(Hash)
-
Request options including :consistency and for column level
control :count, :start, :finish, :reversed
Return
- (CassandraModel|nil)
-
Instantiated object of given class, or nil if not found
281 282 283 284 285 286 287 |
# File 'lib/right_support/db/cassandra_model.rb', line 281 def get(key, opt = {}) if (attrs = real_get(key, opt)).empty? nil else new(key, attrs) end end |
.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key
Parameters
- index(String)
-
Name of secondary index
- key(String)
-
Index value that each selected row is required to match
- columns(Array|nil)
-
Names of columns to be retrieved, defaults to all
- opt(Hash)
-
Request options with only :consistency used
Block
Optional block that is yielded each chunk as it is retrieved as an array like the normally returned result
Return
- (OrderedHash)
-
Rows retrieved with each key, value is columns
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/right_support/db/cassandra_model.rb', line 339 def get_all_indexed_slices(index, key, columns = nil, opt = {}) rows = Cassandra::OrderedHash.new start = "" count = opt.delete(:count) || DEFAULT_COUNT expr = do_op(:create_idx_expr, index, key, "EQ") opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {} while true clause = do_op(:create_idx_clause, [expr], start, count) chunk = self.conn.get_indexed_slices(column_family, clause, columns, opt) rows.merge!(chunk) if chunk.size == count # Assume there are more chunks, use last key as start of next get start = chunk.keys.last else # This must be the last chunk break end end rows end |
.get_columns(key, columns, opt = {}) ⇒ Object
Get specific columns in row with specified key
Parameters
- key(String)
-
Primary key on which to match
- columns(Array)
-
Names of columns to be retrieved
- opt(Hash)
-
Request options such as :consistency
Return
- (Array)
-
Values of selected columns in the order specified
472 473 474 |
# File 'lib/right_support/db/cassandra_model.rb', line 472 def get_columns(key, columns, opt = {}) do_op(:get_columns, column_family, key, columns, sub_columns = nil, opt) end |
.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key
Parameters
- index(String)
-
Name of secondary index
- key(String)
-
Index value that each selected row is required to match
- columns(Array|nil)
-
Names of columns to be retrieved, defaults to all
- opt(Hash)
-
Request options with only :consistency used
Return
- (Array)
-
Rows retrieved with each member being an instantiated object of the
given class as value, but object only contains values for the columns retrieved;
array is always empty if a block is given
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'lib/right_support/db/cassandra_model.rb', line 431 def get_indexed(index, key, columns = nil, opt = {}) rows = [] start = "" count = DEFAULT_COUNT expr = do_op(:create_idx_expr, index, key, "EQ") opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {} loop do clause = do_op(:create_idx_clause, [expr], start, count) chunk = do_op(:get_indexed_slices, column_family, clause, columns, opt) chunk_rows = [] chunk.each do |row_key, row_columns| if row_columns && row_key != start attrs = row_columns.inject({}) { |a, c| a[c.column.name] = c.column.value; a } chunk_rows << new(row_key, attrs) end end if block_given? yield chunk_rows else rows.concat(chunk_rows) end if chunk.size == count # Assume there are more chunks, use last key as start of next get start = chunk.keys.last else # This must be the last chunk break end end rows end |
.inherited(base) ⇒ Object
Deprecate usage of CassandraModel under Ruby < 1.9
128 129 130 |
# File 'lib/right_support/db/cassandra_model.rb', line 128 def inherited(base) raise UnsupportedRubyVersion, "Support only Ruby >= 1.9" unless RUBY_VERSION >= "1.9" end |
.insert(key, values, opt = {}) ⇒ Object
Insert a row for a key
Parameters
- key(String)
-
Primary key for value
- values(Hash)
-
Values to be stored
- opt(Hash)
-
Request options such as :consistency
Return
- (Array)
-
Mutation map and consistency level
485 486 487 |
# File 'lib/right_support/db/cassandra_model.rb', line 485 def insert(key, values, opt={}) do_op(:insert, column_family, key, values, opt) end |
.keyspace ⇒ Object
Return current active keyspace.
Return
- keyspace(String)
-
current_keyspace or default_keyspace
191 192 193 |
# File 'lib/right_support/db/cassandra_model.rb', line 191 def keyspace @@current_keyspace || @@default_keyspace end |
.keyspace=(kyspc) ⇒ Object
Sets the default keyspace
Parameters
- keyspace(String)
-
Set the default keyspace
200 201 202 203 204 205 |
# File 'lib/right_support/db/cassandra_model.rb', line 200 def keyspace=(kyspc) env = ENV['RACK_ENV'] || 'development' nspace = namespace(env) @@default_keyspace = "#{kyspc}_#{env}" @@default_keyspace += "_#{nspace}" if nspace end |
.keyspaces ⇒ Object
Return current keyspace names as Array of String (any keyspace that has been used this session).
Return
- (Array)
-
keyspaces names
183 184 185 |
# File 'lib/right_support/db/cassandra_model.rb', line 183 def keyspaces @@connections.keys end |
.namespace(env) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/right_support/db/cassandra_model.rb', line 167 def namespace(env) if env == 'test' nspace = "testns" else nspace = nil if File.file?("/etc/namespace") nspace = File.read("/etc/namespace").strip end end nspace end |
.real_get(k, opt = {}) ⇒ Object
Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved
Parameters
- k(String|Array)
-
Individual primary key or list of keys on which to match
- opt(Hash)
-
Request options including :consistency and for column level
control :count, :start, :finish, :reversed
Return
- (Cassandra::OrderedHash)
-
Individual row or OrderedHash of rows
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/right_support/db/cassandra_model.rb', line 301 def real_get(k, opt = {}) if k.is_a?(Array) do_op(:multi_get, column_family, k, opt) elsif opt[:count] do_op(:get, column_family, k, opt) else opt = opt.clone opt[:count] = DEFAULT_COUNT columns = Cassandra::OrderedHash.new loop do chunk = do_op(:get, column_family, k, opt) columns.merge!(chunk) if chunk.size == opt[:count] # Assume there are more chunks, use last key as start of next get opt[:start] = chunk.keys.last else # This must be the last chunk break end end columns end end |
.reconnect ⇒ Object
Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available
Return
- true
-
Always return true
575 576 577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/right_support/db/cassandra_model.rb', line 575 def reconnect config = env_config return false if keyspace.nil? = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT} .merge!({:protocol => Thrift::BinaryProtocolAccelerated})\ if defined? Thrift::BinaryProtocolAccelerated connection = Cassandra.new(keyspace, config["server"], ) connection.disable_node_auto_discovery! @@connections[keyspace] = connection true end |
.remove(*args) ⇒ Object
Delete row or columns of row
Parameters
- args(Array)
-
Key, columns, options
Return
- (Array)
-
Mutation map and consistency level
496 497 498 |
# File 'lib/right_support/db/cassandra_model.rb', line 496 def remove(*args) do_op(:remove, column_family, *args) end |
.ring ⇒ Object
Cassandra ring
Return
- (Array)
-
Members of ring
594 595 596 |
# File 'lib/right_support/db/cassandra_model.rb', line 594 def ring conn.ring end |
.stream_all_indexed_slices(index, key) {|Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>| ... } ⇒ Object
This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows This method returns only columns that are within the result set specified by a secondary index equality query This method will iterate through chunks of rows of the resultset and it will yield to the caller all of the columns in chunks of 1,000 until all of the columns have been retrieved
Parameters:
Yields:
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/right_support/db/cassandra_model.rb', line 371 def stream_all_indexed_slices(index, key) expr = do_op(:create_idx_expr, index, key, "EQ") start_row = '' row_count = 10 has_more_rows = true while (start_row != nil) clause = do_op(:create_idx_clause, [expr], start_row, row_count) rows = self.conn.get_indexed_slices(column_family, clause, index, :key_count => row_count, :key_start => start_row) rows = rows.keys rows.shift unless start_row == '' start_row = rows.last rows.each do |row| start_column = '' column_count = 1_000 has_more_columns = true while has_more_columns clause = do_op(:create_idx_clause, [expr], row, 1) chunk = self.conn.get_indexed_slices(column_family, clause, nil, :start => start_column, :count => column_count) # Get first row's columns, because where are getting only one row [see clause, for more details] key = chunk.keys.first columns = chunk[key] columns.shift unless start_column == '' yield(key, columns) unless chunk.empty? if columns.size >= column_count - 1 #Assume there are more columns, use last column as start of next slice start_column = columns.last.column.name column_count = 1_001 else has_more_columns = false end end end end end |
.with_keyspace(keyspace, append_env = true, &block) ⇒ Object
Temporarily change the working keyspace for this class for the duration of the block. Resets working keyspace back to default once control has returned to the caller.
Parameters
- keyspace(String)
-
Keyspace name
- append_env(true|false)
-
optional; default true - whether to append the environment name
- block(Proc)
-
Code that will be called in keyspace context
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/right_support/db/cassandra_model.rb', line 215 def with_keyspace(keyspace, append_env=true, &block) @@current_keyspace = keyspace env = ENV['RACK_ENV'] || 'development' nspace = namespace(env) if append_env if nspace tail = "_#{env}_#{nspace}" else tail = "_#{env}" end @@current_keyspace += tail unless @@current_keyspace.end_with?(tail) end block.call ensure @@current_keyspace = nil end |
Instance Method Details
#[](key) ⇒ Object
Column value
Parameters
- key(String|Integer)
-
Column name or key
Return
- (Object|nil)
-
Column value, or nil if not found
674 675 676 677 678 679 680 |
# File 'lib/right_support/db/cassandra_model.rb', line 674 def [](key) ret = attributes[key] return ret if ret if key.kind_of? Integer return attributes[Cassandra::Long.new(key)] end end |
#[]=(key, value) ⇒ Object
Store new column value
Parameters
- key(String|Integer)
-
Column name or key
- value(Object)
-
Value to be stored
Return
- (Object)
-
Value stored
690 691 692 |
# File 'lib/right_support/db/cassandra_model.rb', line 690 def []=(key, value) attributes[key] = value end |
#destroy ⇒ Object
Delete object from Cassandra
Return
- true
-
Always return true
698 699 700 |
# File 'lib/right_support/db/cassandra_model.rb', line 698 def destroy self.class.remove(key) end |
#reload ⇒ Object
Load object from Cassandra without modifying this object
Return
- (CassandraModel)
-
Object as stored in Cassandra
654 655 656 |
# File 'lib/right_support/db/cassandra_model.rb', line 654 def reload self.class.get(key) end |
#reload! ⇒ Object
Reload object value from Cassandra and update this object
Return
- (CassandraModel)
-
This object after reload from Cassandra
662 663 664 665 |
# File 'lib/right_support/db/cassandra_model.rb', line 662 def reload! self.attributes = self.class.real_get(key) self end |
#save ⇒ Object
Store object in Cassandra
Return
- true
-
Always return true
645 646 647 648 |
# File 'lib/right_support/db/cassandra_model.rb', line 645 def save self.class.insert(key, attributes) true end |