Class: RightSupport::DB::CassandraModel
- Includes:
- Log::Mixin
- Defined in:
- lib/right_support/db/cassandra_model.rb
Overview
Base class for a column family in a keyspace Used to access data persisted in Cassandra Provides wrappers for Cassandra client methods
Constant Summary collapse
- DEFAULT_TIMEOUT =
Default timeout for client connection to Cassandra server
10- DEFAULT_COUNT =
Default maximum number of rows to retrieve in one chunk
100- METHODS_TO_LOG =
[:multi_get, :get, :get_indexed_slices, :get_columns, :insert, :remove, 'multi_get', 'get', 'get_indexed_slices', 'get_columns', 'insert', 'remove']
- @@current_keyspace =
nil- @@connections =
{}
Constants included from Log::Mixin
Class Attribute Summary collapse
-
.column_family ⇒ Object
Returns the value of attribute column_family.
-
.default_keyspace ⇒ Object
readonly
Returns the value of attribute default_keyspace.
Instance Attribute Summary collapse
-
#attributes ⇒ Object
self.
-
#key ⇒ Object
self.
Class Method Summary collapse
-
.all(k, opt = {}) ⇒ Object
Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved.
-
.batch(*args, &block) ⇒ Object
Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands.
- .config ⇒ Object
- .config=(value) ⇒ Object
-
.conn ⇒ Object
Client connected to Cassandra server Create connection if does not already exist Use BinaryProtocolAccelerated if it available.
-
.do_op(meth, *args, &block) ⇒ Object
Perform a Cassandra operation on the connection object.
- .do_op_log(first_started_at, started_at, retries, meth, cf, key) ⇒ Object
- .env_config ⇒ Object
-
.get(key, opt = {}) ⇒ Object
Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved.
-
.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key.
-
.get_columns(key, columns, opt = {}) ⇒ Object
Get specific columns in row with specified key.
-
.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key.
-
.inherited(base) ⇒ Object
Depricate usage of CassandraModel under Ruby < 1.9.
-
.insert(key, values, opt = {}) ⇒ Object
Insert a row for a key.
-
.keyspace ⇒ Object
Returns current active keyspace.
-
.keyspace=(kyspc) ⇒ Object
Sets the default keyspace.
-
.keyspaces ⇒ Object
Return current keyspaces name as Array of String.
-
.real_get(k, opt = {}) ⇒ Object
Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved.
-
.reconnect ⇒ Object
Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available.
-
.remove(*args) ⇒ Object
Delete row or columns of row.
-
.ring ⇒ Object
Cassandra ring.
-
.stream_all_indexed_slices(index, key) {|Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>| ... } ⇒ Object
This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows This method returns only columns that are within the result set specified by a secondary index equality query This method will iterate through chunks of rows of the resultset and it will yield to the caller all of the columns in chunks of 1,000 until all of the columns have been retrieved.
-
.with_keyspace(keyspace, append_env = true, &block) ⇒ Object
Temporarily change the working keyspace for this class for the duration of the block.
Instance Method Summary collapse
-
#[](key) ⇒ Object
Column value.
-
#[]=(key, value) ⇒ Object
Store new column value.
-
#destroy ⇒ Object
Delete object from Cassandra.
-
#initialize(key, attrs = {}) ⇒ CassandraModel
constructor
Create column family object.
-
#reload ⇒ Object
Load object from Cassandra without modifying this object.
-
#reload! ⇒ Object
Reload object value from Cassandra and update this object.
-
#save ⇒ Object
Store object in Cassandra.
Methods included from Log::Mixin
default_logger, default_logger=, included
Constructor Details
#initialize(key, attrs = {}) ⇒ CassandraModel
Create column family object
Parameters
- key(String)
-
Primary key for object
- attrs(Hash)
-
Attributes for object which form Cassandra row
with column name as key and column value as value
592 593 594 595 |
# File 'lib/right_support/db/cassandra_model.rb', line 592 def initialize(key, attrs = {}) self.key = key self.attributes = attrs end |
Class Attribute Details
.column_family ⇒ Object
Returns the value of attribute column_family.
119 120 121 |
# File 'lib/right_support/db/cassandra_model.rb', line 119 def column_family @column_family end |
.default_keyspace ⇒ Object (readonly)
Returns the value of attribute default_keyspace.
118 119 120 |
# File 'lib/right_support/db/cassandra_model.rb', line 118 def default_keyspace @default_keyspace end |
Instance Attribute Details
#attributes ⇒ Object
self
584 585 586 |
# File 'lib/right_support/db/cassandra_model.rb', line 584 def attributes @attributes end |
#key ⇒ Object
self
584 585 586 |
# File 'lib/right_support/db/cassandra_model.rb', line 584 def key @key end |
Class Method Details
.all(k, opt = {}) ⇒ Object
Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved
Parameters
- k(String|Array)
-
Individual primary key or list of keys on which to match
- opt(Hash)
-
Request options including :consistency and for column level
control :count, :start, :finish, :reversed
Return
- (Object|nil)
-
Individual row, or nil if not found, or ordered hash of rows
223 224 225 |
# File 'lib/right_support/db/cassandra_model.rb', line 223 def all(k, opt = {}) real_get(k, opt) end |
.batch(*args, &block) ⇒ Object
Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands
Parameters
- args(Array)
-
Batch options such as :consistency
Block
Required block making Cassandra requests
Returns
- (Array)
-
Mutation map and consistency level
Raise
- Exception
-
If block not specified
473 474 475 476 |
# File 'lib/right_support/db/cassandra_model.rb', line 473 def batch(*args, &block) raise "Block required!" unless block_given? do_op(:batch, *args, &block) end |
.config ⇒ Object
132 133 134 |
# File 'lib/right_support/db/cassandra_model.rb', line 132 def config @@config end |
.config=(value) ⇒ Object
143 144 145 |
# File 'lib/right_support/db/cassandra_model.rb', line 143 def config=(value) @@config = normalize_config(value) unless value.nil? end |
.conn ⇒ Object
Client connected to Cassandra server Create connection if does not already exist Use BinaryProtocolAccelerated if it available
Return
- (Cassandra)
-
Client connected to server
199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/right_support/db/cassandra_model.rb', line 199 def conn() @@connections ||= {} config = env_config = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT} .merge!({:protocol => Thrift::BinaryProtocolAccelerated})\ if defined? Thrift::BinaryProtocolAccelerated @@connections[self.keyspace] ||= Cassandra.new(self.keyspace, config["server"], ) @@connections[self.keyspace].disable_node_auto_discovery! @@connections[self.keyspace] end |
.do_op(meth, *args, &block) ⇒ Object
Perform a Cassandra operation on the connection object. Rescue IOError by automatically reconnecting and retrying the operation.
Parameters
- meth(Symbol)
-
Method to be executed
- *args(Array)
-
Method arguments to forward to the Cassandra connection
Block
Block if any to be executed by method
Return
- (Object)
-
Value returned by executed method
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 |
# File 'lib/right_support/db/cassandra_model.rb', line 490 def do_op(meth, *args, &block) first_started_at ||= Time.now retries ||= 0 started_at = Time.now # cassandra functionality result = conn.send(meth, *args, &block) # log functionality do_op_log(first_started_at, started_at, retries, meth, args[0], args[1]) return result rescue IOError reconnect retries += 1 retry end |
.do_op_log(first_started_at, started_at, retries, meth, cf, key) ⇒ Object
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 |
# File 'lib/right_support/db/cassandra_model.rb', line 508 def do_op_log(first_started_at, started_at, retries, meth, cf, key) now = Time.now attempt_time = now - started_at if METHODS_TO_LOG.include?(meth) key_count = key.is_a?(Array) ? key.size : 1 log_string = sprintf("CassandraModel %s, cf=%s, keys=%d, time=%.1fms", meth, cf, key_count, attempt_time*1000) if retries && retries > 0 total_time = now - first_started_at log_string += sprintf(", retries=%d, total_time=%.1fms", retries, total_time*1000) end logger.debug(log_string) end end |
.env_config ⇒ Object
136 137 138 139 140 141 |
# File 'lib/right_support/db/cassandra_model.rb', line 136 def env_config env = ENV['RACK_ENV'] raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" \ unless !@@config.nil? && @@config.keys.include?(env) && @@config[env] @@config[env] end |
.get(key, opt = {}) ⇒ Object
Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved
Parameters
- key(String)
-
Primary key on which to match
- opt(Hash)
-
Request options including :consistency and for column level
control :count, :start, :finish, :reversed
Return
- (CassandraModel|nil)
-
Instantiated object of given class, or nil if not found
237 238 239 240 241 242 243 |
# File 'lib/right_support/db/cassandra_model.rb', line 237 def get(key, opt = {}) if (attrs = real_get(key, opt)).empty? nil else new(key, attrs) end end |
.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key
Parameters
- index(String)
-
Name of secondary index
- key(String)
-
Index value that each selected row is required to match
- columns(Array|nil)
-
Names of columns to be retrieved, defaults to all
- opt(Hash)
-
Request options with only :consistency used
Block
Optional block that is yielded each chunk as it is retrieved as an array like the normally returned result
Return
- (OrderedHash)
-
Rows retrieved with each key, value is columns
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/right_support/db/cassandra_model.rb', line 295 def get_all_indexed_slices(index, key, columns = nil, opt = {}) rows = Cassandra::OrderedHash.new start = "" count = opt.delete(:count) || DEFAULT_COUNT expr = do_op(:create_idx_expr, index, key, "EQ") opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {} while true clause = do_op(:create_idx_clause, [expr], start, count) chunk = self.conn.get_indexed_slices(column_family, clause, columns, opt) rows.merge!(chunk) if chunk.size == count # Assume there are more chunks, use last key as start of next get start = chunk.keys.last else # This must be the last chunk break end end rows end |
.get_columns(key, columns, opt = {}) ⇒ Object
Get specific columns in row with specified key
Parameters
- key(String)
-
Primary key on which to match
- columns(Array)
-
Names of columns to be retrieved
- opt(Hash)
-
Request options such as :consistency
Return
- (Array)
-
Values of selected columns in the order specified
428 429 430 |
# File 'lib/right_support/db/cassandra_model.rb', line 428 def get_columns(key, columns, opt = {}) do_op(:get_columns, column_family, key, columns, sub_columns = nil, opt) end |
.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object
Get all rows for specified secondary key
Parameters
- index(String)
-
Name of secondary index
- key(String)
-
Index value that each selected row is required to match
- columns(Array|nil)
-
Names of columns to be retrieved, defaults to all
- opt(Hash)
-
Request options with only :consistency used
Return
- (Array)
-
Rows retrieved with each member being an instantiated object of the
given class as value, but object only contains values for the columns retrieved;
array is always empty if a block is given
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 |
# File 'lib/right_support/db/cassandra_model.rb', line 387 def get_indexed(index, key, columns = nil, opt = {}) rows = [] start = "" count = DEFAULT_COUNT expr = do_op(:create_idx_expr, index, key, "EQ") opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {} loop do clause = do_op(:create_idx_clause, [expr], start, count) chunk = do_op(:get_indexed_slices, column_family, clause, columns, opt) chunk_rows = [] chunk.each do |row_key, row_columns| if row_columns && row_key != start attrs = row_columns.inject({}) { |a, c| a[c.column.name] = c.column.value; a } chunk_rows << new(row_key, attrs) end end if block_given? yield chunk_rows else rows.concat(chunk_rows) end if chunk.size == count # Assume there are more chunks, use last key as start of next get start = chunk.keys.last else # This must be the last chunk break end end rows end |
.inherited(base) ⇒ Object
Depricate usage of CassandraModel under Ruby < 1.9
128 129 130 |
# File 'lib/right_support/db/cassandra_model.rb', line 128 def inherited(base) raise UnsupportedRubyVersion, "Support only Ruby >= 1.9" unless RUBY_VERSION >= "1.9" end |
.insert(key, values, opt = {}) ⇒ Object
Insert a row for a key
Parameters
- key(String)
-
Primary key for value
- values(Hash)
-
Values to be stored
- opt(Hash)
-
Request options such as :consistency
Return
- (Array)
-
Mutation map and consistency level
441 442 443 |
# File 'lib/right_support/db/cassandra_model.rb', line 441 def insert(key, values, opt={}) do_op(:insert, column_family, key, values, opt) end |
.keyspace ⇒ Object
Returns current active keyspace.
Return
- keyspace(String)
-
current_keyspace or default_keyspace
161 162 163 |
# File 'lib/right_support/db/cassandra_model.rb', line 161 def keyspace @@current_keyspace || @@default_keyspace end |
.keyspace=(kyspc) ⇒ Object
Sets the default keyspace
Parameters
- keyspace(String)
-
Set the default keyspace
170 171 172 |
# File 'lib/right_support/db/cassandra_model.rb', line 170 def keyspace=(kyspc) @@default_keyspace = (kyspc + "_" + (ENV['RACK_ENV'] || 'development')) end |
.keyspaces ⇒ Object
Return current keyspaces name as Array of String
Return
- (Array)
-
keyspaces names
152 153 154 |
# File 'lib/right_support/db/cassandra_model.rb', line 152 def keyspaces @@connections.keys end |
.real_get(k, opt = {}) ⇒ Object
Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved
Parameters
- k(String|Array)
-
Individual primary key or list of keys on which to match
- opt(Hash)
-
Request options including :consistency and for column level
control :count, :start, :finish, :reversed
Return
- (Cassandra::OrderedHash)
-
Individual row or OrderedHash of rows
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/right_support/db/cassandra_model.rb', line 257 def real_get(k, opt = {}) if k.is_a?(Array) do_op(:multi_get, column_family, k, opt) elsif opt[:count] do_op(:get, column_family, k, opt) else opt = opt.clone opt[:count] = DEFAULT_COUNT columns = Cassandra::OrderedHash.new loop do chunk = do_op(:get, column_family, k, opt) columns.merge!(chunk) if chunk.size == opt[:count] # Assume there are more chunks, use last key as start of next get opt[:start] = chunk.keys.last else # This must be the last chunk break end end columns end end |
.reconnect ⇒ Object
Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available
Return
- true
-
Always return true
531 532 533 534 535 536 537 538 539 540 541 542 543 544 |
# File 'lib/right_support/db/cassandra_model.rb', line 531 def reconnect config = env_config return false if keyspace.nil? = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT} .merge!({:protocol => Thrift::BinaryProtocolAccelerated})\ if defined? Thrift::BinaryProtocolAccelerated connection = Cassandra.new(keyspace, config["server"], ) connection.disable_node_auto_discovery! @@connections[keyspace] = connection true end |
.remove(*args) ⇒ Object
Delete row or columns of row
Parameters
- args(Array)
-
Key, columns, options
Return
- (Array)
-
Mutation map and consistency level
452 453 454 |
# File 'lib/right_support/db/cassandra_model.rb', line 452 def remove(*args) do_op(:remove, column_family, *args) end |
.ring ⇒ Object
Cassandra ring
Return
- (Array)
-
Members of ring
550 551 552 |
# File 'lib/right_support/db/cassandra_model.rb', line 550 def ring conn.ring end |
.stream_all_indexed_slices(index, key) {|Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>| ... } ⇒ Object
This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows This method returns only columns that are within the result set specified by a secondary index equality query This method will iterate through chunks of rows of the resultset and it will yield to the caller all of the columns in chunks of 1,000 until all of the columns have been retrieved
Parameters:
Yields:
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/right_support/db/cassandra_model.rb', line 327 def stream_all_indexed_slices(index, key) expr = do_op(:create_idx_expr, index, key, "EQ") start_row = '' row_count = 10 has_more_rows = true while (start_row != nil) clause = do_op(:create_idx_clause, [expr], start_row, row_count) rows = self.conn.get_indexed_slices(column_family, clause, index, :key_count => row_count, :key_start => start_row) rows = rows.keys rows.shift unless start_row == '' start_row = rows.last rows.each do |row| start_column = '' column_count = 1_000 has_more_columns = true while has_more_columns clause = do_op(:create_idx_clause, [expr], row, 1) chunk = self.conn.get_indexed_slices(column_family, clause, nil, :start => start_column, :count => column_count) # Get first row's columns, because where are getting only one row [see clause, for more details] key = chunk.keys.first columns = chunk[key] columns.shift unless start_column == '' yield(key, columns) unless chunk.empty? if columns.size >= column_count - 1 #Assume there are more columns, use last column as start of next slice start_column = columns.last.column.name column_count = 1_001 else has_more_columns = false end end end end end |
.with_keyspace(keyspace, append_env = true, &block) ⇒ Object
Temporarily change the working keyspace for this class for the duration of the block. Resets working keyspace back to default once control has returned to the caller.
Parameters
- keyspace(String)
-
Keyspace name
- append_env(true|false)
-
optional; default true - whether to append the environment name
- block(Proc)
-
Code that will be called in keyspace context
182 183 184 185 186 187 188 189 190 191 |
# File 'lib/right_support/db/cassandra_model.rb', line 182 def with_keyspace(keyspace, append_env=true, &block) @@current_keyspace = keyspace env = ENV['RACK_ENV'] || 'development' if append_env && @@current_keyspace !~ /_#{env}$/ @@current_keyspace = "#{@@current_keyspace}_#{env}" end block.call ensure @@current_keyspace = nil end |
Instance Method Details
#[](key) ⇒ Object
Column value
Parameters
- key(String|Integer)
-
Column name or key
Return
- (Object|nil)
-
Column value, or nil if not found
630 631 632 633 634 635 636 |
# File 'lib/right_support/db/cassandra_model.rb', line 630 def [](key) ret = attributes[key] return ret if ret if key.kind_of? Integer return attributes[Cassandra::Long.new(key)] end end |
#[]=(key, value) ⇒ Object
Store new column value
Parameters
- key(String|Integer)
-
Column name or key
- value(Object)
-
Value to be stored
Return
- (Object)
-
Value stored
646 647 648 |
# File 'lib/right_support/db/cassandra_model.rb', line 646 def []=(key, value) attributes[key] = value end |
#destroy ⇒ Object
Delete object from Cassandra
Return
- true
-
Always return true
654 655 656 |
# File 'lib/right_support/db/cassandra_model.rb', line 654 def destroy self.class.remove(key) end |
#reload ⇒ Object
Load object from Cassandra without modifying this object
Return
- (CassandraModel)
-
Object as stored in Cassandra
610 611 612 |
# File 'lib/right_support/db/cassandra_model.rb', line 610 def reload self.class.get(key) end |
#reload! ⇒ Object
Reload object value from Cassandra and update this object
Return
- (CassandraModel)
-
This object after reload from Cassandra
618 619 620 621 |
# File 'lib/right_support/db/cassandra_model.rb', line 618 def reload! self.attributes = self.class.real_get(key) self end |
#save ⇒ Object
Store object in Cassandra
Return
- true
-
Always return true
601 602 603 604 |
# File 'lib/right_support/db/cassandra_model.rb', line 601 def save self.class.insert(key, attributes) true end |