Class: RightSupport::DB::CassandraModel

Inherits:
Object
  • Object
show all
Includes:
Log::Mixin
Defined in:
lib/right_support/db/cassandra_model.rb

Overview

Base class for a column family in a keyspace Used to access data persisted in Cassandra Provides wrappers for Cassandra client methods

Constant Summary collapse

DEFAULT_TIMEOUT =

Default timeout for client connection to Cassandra server

10
DEFAULT_COUNT =

Default maximum number of rows to retrieve in one chunk

100
METHODS_TO_LOG =
[:multi_get, :get, :get_indexed_slices, :get_columns, :insert, :remove, 'multi_get', 'get', 'get_indexed_slices', 'get_columns', 'insert', 'remove']
@@current_keyspace =
nil
@@connections =
{}

Constants included from Log::Mixin

Log::Mixin::Decorator

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Log::Mixin

default_logger, default_logger=, included

Constructor Details

#initialize(key, attrs = {}) ⇒ CassandraModel

Create column family object

Parameters

key(String)

Primary key for object

attrs(Hash)

Attributes for object which form Cassandra row

with column name as key and column value as value


592
593
594
595
# File 'lib/right_support/db/cassandra_model.rb', line 592

def initialize(key, attrs = {})
  self.key = key
  self.attributes = attrs
end

Class Attribute Details

.column_familyObject

Returns the value of attribute column_family.



119
120
121
# File 'lib/right_support/db/cassandra_model.rb', line 119

def column_family
  @column_family
end

.default_keyspaceObject (readonly)

Returns the value of attribute default_keyspace.



118
119
120
# File 'lib/right_support/db/cassandra_model.rb', line 118

def default_keyspace
  @default_keyspace
end

Instance Attribute Details

#attributesObject

self



584
585
586
# File 'lib/right_support/db/cassandra_model.rb', line 584

def attributes
  @attributes
end

#keyObject

self



584
585
586
# File 'lib/right_support/db/cassandra_model.rb', line 584

def key
  @key
end

Class Method Details

.all(k, opt = {}) ⇒ Object

Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Object|nil)

Individual row, or nil if not found, or ordered hash of rows



223
224
225
# File 'lib/right_support/db/cassandra_model.rb', line 223

def all(k, opt = {})
  real_get(k, opt)
end

.batch(*args, &block) ⇒ Object

Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands

Parameters

args(Array)

Batch options such as :consistency

Block

Required block making Cassandra requests

Returns

(Array)

Mutation map and consistency level

Raise

Exception

If block not specified



473
474
475
476
# File 'lib/right_support/db/cassandra_model.rb', line 473

def batch(*args, &block)
  raise "Block required!" unless block_given?
  do_op(:batch, *args, &block)
end

.configObject



132
133
134
# File 'lib/right_support/db/cassandra_model.rb', line 132

def config
  @@config
end

.config=(value) ⇒ Object



143
144
145
# File 'lib/right_support/db/cassandra_model.rb', line 143

def config=(value)
  @@config = normalize_config(value) unless value.nil?
end

.connObject

Client connected to Cassandra server Create connection if does not already exist Use BinaryProtocolAccelerated if it available

Return

(Cassandra)

Client connected to server



199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/right_support/db/cassandra_model.rb', line 199

def conn()
  @@connections ||= {}

  config = env_config

  thrift_client_options = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT}
  thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated})\
    if defined? Thrift::BinaryProtocolAccelerated

  @@connections[self.keyspace] ||= Cassandra.new(self.keyspace, config["server"], thrift_client_options)
  @@connections[self.keyspace].disable_node_auto_discovery!
  @@connections[self.keyspace]
end

.do_op(meth, *args, &block) ⇒ Object

Perform a Cassandra operation on the connection object. Rescue IOError by automatically reconnecting and retrying the operation.

Parameters

meth(Symbol)

Method to be executed

*args(Array)

Method arguments to forward to the Cassandra connection

Block

Block if any to be executed by method

Return

(Object)

Value returned by executed method



490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/right_support/db/cassandra_model.rb', line 490

def do_op(meth, *args, &block)
  first_started_at ||= Time.now
  retries          ||= 0
  started_at         = Time.now

  # cassandra functionality
  result = conn.send(meth, *args, &block)

  # log functionality
  do_op_log(first_started_at, started_at, retries, meth, args[0], args[1])

  return result
rescue IOError
  reconnect
  retries += 1
  retry
end

.do_op_log(first_started_at, started_at, retries, meth, cf, key) ⇒ Object



508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
# File 'lib/right_support/db/cassandra_model.rb', line 508

def do_op_log(first_started_at, started_at, retries, meth, cf, key)
  now          = Time.now
  attempt_time = now - started_at

  if METHODS_TO_LOG.include?(meth)
    key_count = key.is_a?(Array) ? key.size : 1

    log_string = sprintf("CassandraModel %s, cf=%s, keys=%d, time=%.1fms", meth, cf, key_count, attempt_time*1000)

    if retries && retries > 0
      total_time  = now - first_started_at
      log_string += sprintf(", retries=%d, total_time=%.1fms", retries, total_time*1000)
    end

    logger.debug(log_string)
  end
end

.env_configObject



136
137
138
139
140
141
# File 'lib/right_support/db/cassandra_model.rb', line 136

def env_config
  env = ENV['RACK_ENV']
  raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" \
      unless !@@config.nil? && @@config.keys.include?(env) && @@config[env]
  @@config[env]
end

.get(key, opt = {}) ⇒ Object

Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

key(String)

Primary key on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(CassandraModel|nil)

Instantiated object of given class, or nil if not found



237
238
239
240
241
242
243
# File 'lib/right_support/db/cassandra_model.rb', line 237

def get(key, opt = {})
  if (attrs = real_get(key, opt)).empty?
    nil
  else
    new(key, attrs)
  end
end

.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency used

Block

Optional block that is yielded each chunk as it is retrieved as an array like the normally returned result

Return

(OrderedHash)

Rows retrieved with each key, value is columns



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/right_support/db/cassandra_model.rb', line 295

def get_all_indexed_slices(index, key, columns = nil, opt = {})
  rows = Cassandra::OrderedHash.new
  start = ""
  count = opt.delete(:count) || DEFAULT_COUNT
  expr = do_op(:create_idx_expr, index, key, "EQ")
  opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  while true
    clause = do_op(:create_idx_clause, [expr], start, count)
    chunk = self.conn.get_indexed_slices(column_family, clause, columns, opt)
    rows.merge!(chunk)
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
    # This must be the last chunk
      break
    end
  end
  rows
end

.get_columns(key, columns, opt = {}) ⇒ Object

Get specific columns in row with specified key

Parameters

key(String)

Primary key on which to match

columns(Array)

Names of columns to be retrieved

opt(Hash)

Request options such as :consistency

Return

(Array)

Values of selected columns in the order specified



428
429
430
# File 'lib/right_support/db/cassandra_model.rb', line 428

def get_columns(key, columns, opt = {})
  do_op(:get_columns, column_family, key, columns, sub_columns = nil, opt)
end

.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency used

Return

(Array)

Rows retrieved with each member being an instantiated object of the

given class as value, but object only contains values for the columns retrieved;
array is always empty if a block is given


387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/right_support/db/cassandra_model.rb', line 387

def get_indexed(index, key, columns = nil, opt = {})
  rows = []
  start = ""
  count = DEFAULT_COUNT
  expr = do_op(:create_idx_expr, index, key, "EQ")
  opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  loop do
    clause = do_op(:create_idx_clause, [expr], start, count)
    chunk = do_op(:get_indexed_slices, column_family, clause, columns, opt)
    chunk_rows = []
    chunk.each do |row_key, row_columns|
      if row_columns && row_key != start
        attrs = row_columns.inject({}) { |a, c| a[c.column.name] = c.column.value; a }
        chunk_rows << new(row_key, attrs)
      end
    end
    if block_given?
      yield chunk_rows
    else
      rows.concat(chunk_rows)
    end
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
      # This must be the last chunk
      break
    end
  end
  rows
end

.inherited(base) ⇒ Object

Depricate usage of CassandraModel under Ruby < 1.9



128
129
130
# File 'lib/right_support/db/cassandra_model.rb', line 128

def inherited(base)
  raise UnsupportedRubyVersion, "Support only Ruby >= 1.9" unless RUBY_VERSION >= "1.9"
end

.insert(key, values, opt = {}) ⇒ Object

Insert a row for a key

Parameters

key(String)

Primary key for value

values(Hash)

Values to be stored

opt(Hash)

Request options such as :consistency

Return

(Array)

Mutation map and consistency level



441
442
443
# File 'lib/right_support/db/cassandra_model.rb', line 441

def insert(key, values, opt={})
  do_op(:insert, column_family, key, values, opt)
end

.keyspaceObject

Returns current active keyspace.

Return

keyspace(String)

current_keyspace or default_keyspace



161
162
163
# File 'lib/right_support/db/cassandra_model.rb', line 161

def keyspace
  @@current_keyspace || @@default_keyspace
end

.keyspace=(kyspc) ⇒ Object

Sets the default keyspace

Parameters

keyspace(String)

Set the default keyspace



170
171
172
# File 'lib/right_support/db/cassandra_model.rb', line 170

def keyspace=(kyspc)
  @@default_keyspace = (kyspc + "_" + (ENV['RACK_ENV'] || 'development'))
end

.keyspacesObject

Return current keyspaces name as Array of String

Return

(Array)

keyspaces names



152
153
154
# File 'lib/right_support/db/cassandra_model.rb', line 152

def keyspaces
  @@connections.keys
end

.real_get(k, opt = {}) ⇒ Object

Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Cassandra::OrderedHash)

Individual row or OrderedHash of rows



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/right_support/db/cassandra_model.rb', line 257

def real_get(k, opt = {})
  if k.is_a?(Array)
    do_op(:multi_get, column_family, k, opt)
  elsif opt[:count]
    do_op(:get, column_family, k, opt)
  else
    opt = opt.clone
    opt[:count] = DEFAULT_COUNT
    columns = Cassandra::OrderedHash.new
    loop do
      chunk = do_op(:get, column_family, k, opt)
      columns.merge!(chunk)
      if chunk.size == opt[:count]
        # Assume there are more chunks, use last key as start of next get
        opt[:start] = chunk.keys.last
      else
        # This must be the last chunk
        break
      end
    end
    columns
  end
end

.reconnectObject

Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available

Return

true

Always return true



531
532
533
534
535
536
537
538
539
540
541
542
543
544
# File 'lib/right_support/db/cassandra_model.rb', line 531

def reconnect
  config = env_config

  return false if keyspace.nil?

  thrift_client_options = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT}
  thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated})\
    if defined? Thrift::BinaryProtocolAccelerated

  connection = Cassandra.new(keyspace, config["server"], thrift_client_options)
  connection.disable_node_auto_discovery!
  @@connections[keyspace] = connection
  true
end

.remove(*args) ⇒ Object

Delete row or columns of row

Parameters

args(Array)

Key, columns, options

Return

(Array)

Mutation map and consistency level



452
453
454
# File 'lib/right_support/db/cassandra_model.rb', line 452

def remove(*args)
  do_op(:remove, column_family, *args)
end

.ringObject

Cassandra ring

Return

(Array)

Members of ring



550
551
552
# File 'lib/right_support/db/cassandra_model.rb', line 550

def ring
  conn.ring
end

.stream_all_indexed_slices(index, key) {|Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>| ... } ⇒ Object

This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows This method returns only columns that are within the result set specified by a secondary index equality query This method will iterate through chunks of rows of the resultset and it will yield to the caller all of the columns in chunks of 1,000 until all of the columns have been retrieved

Parameters:

Yields:

Yields:

  • (Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>)

    irray containing ndex column value passed in and an array of columns matching the index query



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/right_support/db/cassandra_model.rb', line 327

def stream_all_indexed_slices(index, key)
  expr = do_op(:create_idx_expr, index, key, "EQ")

  start_row = ''
  row_count = 10
  has_more_rows = true

  while (start_row != nil)
    clause = do_op(:create_idx_clause, [expr], start_row, row_count)

    rows = self.conn.get_indexed_slices(column_family, clause, index,
                                        :key_count => row_count,
                                        :key_start => start_row)

    rows = rows.keys
    rows.shift unless start_row == ''
    start_row = rows.last

    rows.each do |row|
      start_column = ''
      column_count = 1_000
      has_more_columns = true

      while has_more_columns
        clause = do_op(:create_idx_clause, [expr], row, 1)
        chunk = self.conn.get_indexed_slices(column_family, clause, nil,
                                             :start => start_column,
                                             :count => column_count)

        # Get first row's columns, because where are getting only one row [see clause, for more details]
        key = chunk.keys.first
        columns = chunk[key]

        columns.shift unless start_column == ''
        yield(key, columns) unless chunk.empty?

        if columns.size >= column_count - 1
          #Assume there are more columns, use last column as start of next slice
          start_column = columns.last.column.name
          column_count = 1_001
        else
          has_more_columns = false
        end
      end
    end
  end
end

.with_keyspace(keyspace, append_env = true, &block) ⇒ Object

Temporarily change the working keyspace for this class for the duration of the block. Resets working keyspace back to default once control has returned to the caller.

Parameters

keyspace(String)

Keyspace name

append_env(true|false)

optional; default true - whether to append the environment name

block(Proc)

Code that will be called in keyspace context



182
183
184
185
186
187
188
189
190
191
# File 'lib/right_support/db/cassandra_model.rb', line 182

def with_keyspace(keyspace, append_env=true, &block)
  @@current_keyspace = keyspace
  env = ENV['RACK_ENV'] || 'development'
  if append_env && @@current_keyspace !~ /_#{env}$/
    @@current_keyspace = "#{@@current_keyspace}_#{env}"
  end
  block.call
  ensure
    @@current_keyspace = nil
end

Instance Method Details

#[](key) ⇒ Object

Column value

Parameters

key(String|Integer)

Column name or key

Return

(Object|nil)

Column value, or nil if not found



630
631
632
633
634
635
636
# File 'lib/right_support/db/cassandra_model.rb', line 630

def [](key)
  ret = attributes[key]
  return ret if ret
  if key.kind_of? Integer
    return attributes[Cassandra::Long.new(key)]
  end
end

#[]=(key, value) ⇒ Object

Store new column value

Parameters

key(String|Integer)

Column name or key

value(Object)

Value to be stored

Return

(Object)

Value stored



646
647
648
# File 'lib/right_support/db/cassandra_model.rb', line 646

def []=(key, value)
  attributes[key] = value
end

#destroyObject

Delete object from Cassandra

Return

true

Always return true



654
655
656
# File 'lib/right_support/db/cassandra_model.rb', line 654

def destroy
  self.class.remove(key)
end

#reloadObject

Load object from Cassandra without modifying this object

Return

(CassandraModel)

Object as stored in Cassandra



610
611
612
# File 'lib/right_support/db/cassandra_model.rb', line 610

def reload
  self.class.get(key)
end

#reload!Object

Reload object value from Cassandra and update this object

Return

(CassandraModel)

This object after reload from Cassandra



618
619
620
621
# File 'lib/right_support/db/cassandra_model.rb', line 618

def reload!
  self.attributes = self.class.real_get(key)
  self
end

#saveObject

Store object in Cassandra

Return

true

Always return true



601
602
603
604
# File 'lib/right_support/db/cassandra_model.rb', line 601

def save
  self.class.insert(key, attributes)
  true
end