Class: RightSupport::DB::CassandraModel

Inherits:
Object
  • Object
show all
Includes:
Log::Mixin
Defined in:
lib/right_support/db/cassandra_model.rb

Overview

Base class for a column family in a keyspace Used to access data persisted in Cassandra Provides wrappers for Cassandra client methods

Constant Summary collapse

DEFAULT_TIMEOUT =

Default timeout for client connection to Cassandra server

10
DEFAULT_COUNT =

Default maximum number of rows to retrieve in one chunk

100
METHODS_TO_LOG =
[:multi_get, :get, :get_indexed_slices, :get_columns, :insert, :remove, 'multi_get', 'get', 'get_indexed_slices', 'get_columns', 'insert', 'remove']
@@current_keyspace =
nil
@@connections =
{}

Constants included from Log::Mixin

Log::Mixin::Decorator

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Log::Mixin

default_logger, default_logger=, included

Constructor Details

#initialize(key, attrs = {}) ⇒ CassandraModel

Create column family object

Parameters

key(String)

Primary key for object

attrs(Hash)

Attributes for object which form Cassandra row

with column name as key and column value as value


636
637
638
639
# File 'lib/right_support/db/cassandra_model.rb', line 636

def initialize(key, attrs = {})
  self.key = key
  self.attributes = attrs
end

Class Attribute Details

.column_familyObject

Returns the value of attribute column_family.



119
120
121
# File 'lib/right_support/db/cassandra_model.rb', line 119

def column_family
  @column_family
end

.default_keyspaceObject (readonly)

Returns the value of attribute default_keyspace.



118
119
120
# File 'lib/right_support/db/cassandra_model.rb', line 118

def default_keyspace
  @default_keyspace
end

Instance Attribute Details

#attributesObject

self



628
629
630
# File 'lib/right_support/db/cassandra_model.rb', line 628

def attributes
  @attributes
end

#keyObject

self



628
629
630
# File 'lib/right_support/db/cassandra_model.rb', line 628

def key
  @key
end

Class Method Details

.all(k, opt = {}) ⇒ Object

Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Object|nil)

Individual row, or nil if not found, or ordered hash of rows



267
268
269
# File 'lib/right_support/db/cassandra_model.rb', line 267

def all(k, opt = {})
  real_get(k, opt)
end

.batch(*args, &block) ⇒ Object

Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands

Parameters

args(Array)

Batch options such as :consistency

Block

Required block making Cassandra requests

Returns

(Array)

Mutation map and consistency level

Raise

Exception

If block not specified



517
518
519
520
# File 'lib/right_support/db/cassandra_model.rb', line 517

def batch(*args, &block)
  raise "Block required!" unless block_given?
  do_op(:batch, *args, &block)
end

.calculate_random_partitioner_token(key) ⇒ Integer

Compute the token for a given row key, which can provide information on the progress of very large “each” operations, e.g. iterating over all rows of a column family.

Parameters:

  • key (String)

    byte-vector (binary String) representation of the row key

Returns:

  • (Integer)

    the 128-bit token for a given row key, as used by Cassandra’s RandomPartitioner



152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/right_support/db/cassandra_model.rb', line 152

def calculate_random_partitioner_token(key)
  number = Digest::MD5.hexdigest(key).to_i(16)

  if number >= (2**127)
    # perform two's complement, basically this takes the absolute value of the number as
    # if it were a 128-bit signed number. Equivalent to Java BigInteger.abs() operation.
    result = (number ^ (2**128)-1) + 1
  else
    # we're good
    result = number
  end

  result
end

.configObject



132
133
134
# File 'lib/right_support/db/cassandra_model.rb', line 132

def config
  @@config
end

.config=(value) ⇒ Object



143
144
145
# File 'lib/right_support/db/cassandra_model.rb', line 143

def config=(value)
  @@config = normalize_config(value) unless value.nil?
end

.connObject

Client connected to Cassandra server Create connection if does not already exist Use BinaryProtocolAccelerated if it available

Return

(Cassandra)

Client connected to server



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/right_support/db/cassandra_model.rb', line 238

def conn()
  @@connections ||= {}

  config = env_config

  thrift_client_options = {
    :timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT,
    :server_retry_period => nil,
  }

  if defined? Thrift::BinaryProtocolAccelerated
    thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated})
  end
    
  @@connections[self.keyspace] ||= Cassandra.new(self.keyspace, config["server"], thrift_client_options)
  @@connections[self.keyspace].disable_node_auto_discovery!
  @@connections[self.keyspace]
end

.do_op(meth, *args, &block) ⇒ Object

Perform a Cassandra operation on the connection object. Rescue IOError by automatically reconnecting and retrying the operation.

Parameters

meth(Symbol)

Method to be executed

*args(Array)

Method arguments to forward to the Cassandra connection

Block

Block if any to be executed by method

Return

(Object)

Value returned by executed method



534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
# File 'lib/right_support/db/cassandra_model.rb', line 534

def do_op(meth, *args, &block)
  first_started_at ||= Time.now
  retries          ||= 0
  started_at         = Time.now

  # cassandra functionality
  result = conn.send(meth, *args, &block)

  # log functionality
  do_op_log(first_started_at, started_at, retries, meth, args[0], args[1])

  return result
rescue IOError
  reconnect
  retries += 1
  retry
end

.do_op_log(first_started_at, started_at, retries, meth, cf, key) ⇒ Object



552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
# File 'lib/right_support/db/cassandra_model.rb', line 552

def do_op_log(first_started_at, started_at, retries, meth, cf, key)
  now          = Time.now
  attempt_time = now - started_at

  if METHODS_TO_LOG.include?(meth)
    key_count = key.is_a?(Array) ? key.size : 1

    log_string = sprintf("CassandraModel %s, cf=%s, keys=%d, time=%.1fms", meth, cf, key_count, attempt_time*1000)

    if retries && retries > 0
      total_time  = now - first_started_at
      log_string += sprintf(", retries=%d, total_time=%.1fms", retries, total_time*1000)
    end

    logger.debug(log_string)
  end
end

.env_configObject



136
137
138
139
140
141
# File 'lib/right_support/db/cassandra_model.rb', line 136

def env_config
  env = ENV['RACK_ENV']
  raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" \
      unless !@@config.nil? && @@config.keys.include?(env) && @@config[env]
  @@config[env]
end

.get(key, opt = {}) ⇒ Object

Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

key(String)

Primary key on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(CassandraModel|nil)

Instantiated object of given class, or nil if not found



281
282
283
284
285
286
287
# File 'lib/right_support/db/cassandra_model.rb', line 281

def get(key, opt = {})
  if (attrs = real_get(key, opt)).empty?
    nil
  else
    new(key, attrs)
  end
end

.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency used

Block

Optional block that is yielded each chunk as it is retrieved as an array like the normally returned result

Return

(OrderedHash)

Rows retrieved with each key, value is columns



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/right_support/db/cassandra_model.rb', line 339

def get_all_indexed_slices(index, key, columns = nil, opt = {})
  rows = Cassandra::OrderedHash.new
  start = ""
  count = opt.delete(:count) || DEFAULT_COUNT
  expr = do_op(:create_idx_expr, index, key, "EQ")
  opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  while true
    clause = do_op(:create_idx_clause, [expr], start, count)
    chunk = self.conn.get_indexed_slices(column_family, clause, columns, opt)
    rows.merge!(chunk)
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
    # This must be the last chunk
      break
    end
  end
  rows
end

.get_columns(key, columns, opt = {}) ⇒ Object

Get specific columns in row with specified key

Parameters

key(String)

Primary key on which to match

columns(Array)

Names of columns to be retrieved

opt(Hash)

Request options such as :consistency

Return

(Array)

Values of selected columns in the order specified



472
473
474
# File 'lib/right_support/db/cassandra_model.rb', line 472

def get_columns(key, columns, opt = {})
  do_op(:get_columns, column_family, key, columns, sub_columns = nil, opt)
end

.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency used

Return

(Array)

Rows retrieved with each member being an instantiated object of the

given class as value, but object only contains values for the columns retrieved;
array is always empty if a block is given


431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/right_support/db/cassandra_model.rb', line 431

def get_indexed(index, key, columns = nil, opt = {})
  rows = []
  start = ""
  count = DEFAULT_COUNT
  expr = do_op(:create_idx_expr, index, key, "EQ")
  opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  loop do
    clause = do_op(:create_idx_clause, [expr], start, count)
    chunk = do_op(:get_indexed_slices, column_family, clause, columns, opt)
    chunk_rows = []
    chunk.each do |row_key, row_columns|
      if row_columns && row_key != start
        attrs = row_columns.inject({}) { |a, c| a[c.column.name] = c.column.value; a }
        chunk_rows << new(row_key, attrs)
      end
    end
    if block_given?
      yield chunk_rows
    else
      rows.concat(chunk_rows)
    end
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
      # This must be the last chunk
      break
    end
  end
  rows
end

.inherited(base) ⇒ Object

Deprecate usage of CassandraModel under Ruby < 1.9



128
129
130
# File 'lib/right_support/db/cassandra_model.rb', line 128

def inherited(base)
  raise UnsupportedRubyVersion, "Support only Ruby >= 1.9" unless RUBY_VERSION >= "1.9"
end

.insert(key, values, opt = {}) ⇒ Object

Insert a row for a key

Parameters

key(String)

Primary key for value

values(Hash)

Values to be stored

opt(Hash)

Request options such as :consistency

Return

(Array)

Mutation map and consistency level



485
486
487
# File 'lib/right_support/db/cassandra_model.rb', line 485

def insert(key, values, opt={})
  do_op(:insert, column_family, key, values, opt)
end

.keyspaceObject

Return current active keyspace.

Return

keyspace(String)

current_keyspace or default_keyspace



191
192
193
# File 'lib/right_support/db/cassandra_model.rb', line 191

def keyspace
  @@current_keyspace || @@default_keyspace
end

.keyspace=(kyspc) ⇒ Object

Sets the default keyspace

Parameters

keyspace(String)

Set the default keyspace



200
201
202
203
204
205
# File 'lib/right_support/db/cassandra_model.rb', line 200

def keyspace=(kyspc)
  env = ENV['RACK_ENV'] || 'development'
  nspace = namespace(env)
  @@default_keyspace = "#{kyspc}_#{env}"
  @@default_keyspace += "_#{nspace}" if nspace
end

.keyspacesObject

Return current keyspace names as Array of String (any keyspace that has been used this session).

Return

(Array)

keyspaces names



183
184
185
# File 'lib/right_support/db/cassandra_model.rb', line 183

def keyspaces
  @@connections.keys
end

.namespace(env) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/right_support/db/cassandra_model.rb', line 167

def namespace(env)
  if env == 'test'
    nspace = "testns"
  else
    nspace = nil
    if File.file?("/etc/namespace")
      nspace = File.read("/etc/namespace").strip
    end
  end
  nspace
end

.real_get(k, opt = {}) ⇒ Object

Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Cassandra::OrderedHash)

Individual row or OrderedHash of rows



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/right_support/db/cassandra_model.rb', line 301

def real_get(k, opt = {})
  if k.is_a?(Array)
    do_op(:multi_get, column_family, k, opt)
  elsif opt[:count]
    do_op(:get, column_family, k, opt)
  else
    opt = opt.clone
    opt[:count] = DEFAULT_COUNT
    columns = Cassandra::OrderedHash.new
    loop do
      chunk = do_op(:get, column_family, k, opt)
      columns.merge!(chunk)
      if chunk.size == opt[:count]
        # Assume there are more chunks, use last key as start of next get
        opt[:start] = chunk.keys.last
      else
        # This must be the last chunk
        break
      end
    end
    columns
  end
end

.reconnectObject

Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available

Return

true

Always return true



575
576
577
578
579
580
581
582
583
584
585
586
587
588
# File 'lib/right_support/db/cassandra_model.rb', line 575

def reconnect
  config = env_config

  return false if keyspace.nil?

  thrift_client_options = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT}
  thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated})\
    if defined? Thrift::BinaryProtocolAccelerated

  connection = Cassandra.new(keyspace, config["server"], thrift_client_options)
  connection.disable_node_auto_discovery!
  @@connections[keyspace] = connection
  true
end

.remove(*args) ⇒ Object

Delete row or columns of row

Parameters

args(Array)

Key, columns, options

Return

(Array)

Mutation map and consistency level



496
497
498
# File 'lib/right_support/db/cassandra_model.rb', line 496

def remove(*args)
  do_op(:remove, column_family, *args)
end

.ringObject

Cassandra ring

Return

(Array)

Members of ring



594
595
596
# File 'lib/right_support/db/cassandra_model.rb', line 594

def ring
  conn.ring
end

.stream_all_indexed_slices(index, key) {|Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>| ... } ⇒ Object

This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows This method returns only columns that are within the result set specified by a secondary index equality query This method will iterate through chunks of rows of the resultset and it will yield to the caller all of the columns in chunks of 1,000 until all of the columns have been retrieved

Parameters:

Yields:

Parameters:

  • index (String)

    column name

  • index (String)

    column value

Yields:

  • (Array<String, Array<CassandraThrift::ColumnOrSuperColumn>>)

    irray containing ndex column value passed in and an array of columns matching the index query



371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# File 'lib/right_support/db/cassandra_model.rb', line 371

def stream_all_indexed_slices(index, key)
  expr = do_op(:create_idx_expr, index, key, "EQ")

  start_row = ''
  row_count = 10
  has_more_rows = true

  while (start_row != nil)
    clause = do_op(:create_idx_clause, [expr], start_row, row_count)

    rows = self.conn.get_indexed_slices(column_family, clause, index,
                                        :key_count => row_count,
                                        :key_start => start_row)

    rows = rows.keys
    rows.shift unless start_row == ''
    start_row = rows.last

    rows.each do |row|
      start_column = ''
      column_count = 1_000
      has_more_columns = true

      while has_more_columns
        clause = do_op(:create_idx_clause, [expr], row, 1)
        chunk = self.conn.get_indexed_slices(column_family, clause, nil,
                                             :start => start_column,
                                             :count => column_count)

        # Get first row's columns, because where are getting only one row [see clause, for more details]
        key = chunk.keys.first
        columns = chunk[key]

        columns.shift unless start_column == ''
        yield(key, columns) unless chunk.empty?

        if columns.size >= column_count - 1
          #Assume there are more columns, use last column as start of next slice
          start_column = columns.last.column.name
          column_count = 1_001
        else
          has_more_columns = false
        end
      end
    end
  end
end

.with_keyspace(keyspace, append_env = true, &block) ⇒ Object

Temporarily change the working keyspace for this class for the duration of the block. Resets working keyspace back to default once control has returned to the caller.

Parameters

keyspace(String)

Keyspace name

append_env(true|false)

optional; default true - whether to append the environment name

block(Proc)

Code that will be called in keyspace context



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/right_support/db/cassandra_model.rb', line 215

def with_keyspace(keyspace, append_env=true, &block)
  @@current_keyspace = keyspace
  env = ENV['RACK_ENV'] || 'development'
  nspace = namespace(env)
  if append_env
    if nspace
      tail = "_#{env}_#{nspace}"
    else
      tail = "_#{env}"
    end
    @@current_keyspace += tail unless @@current_keyspace.end_with?(tail)
  end
  block.call
  ensure
    @@current_keyspace = nil
end

Instance Method Details

#[](key) ⇒ Object

Column value

Parameters

key(String|Integer)

Column name or key

Return

(Object|nil)

Column value, or nil if not found



674
675
676
677
678
679
680
# File 'lib/right_support/db/cassandra_model.rb', line 674

def [](key)
  ret = attributes[key]
  return ret if ret
  if key.kind_of? Integer
    return attributes[Cassandra::Long.new(key)]
  end
end

#[]=(key, value) ⇒ Object

Store new column value

Parameters

key(String|Integer)

Column name or key

value(Object)

Value to be stored

Return

(Object)

Value stored



690
691
692
# File 'lib/right_support/db/cassandra_model.rb', line 690

def []=(key, value)
  attributes[key] = value
end

#destroyObject

Delete object from Cassandra

Return

true

Always return true



698
699
700
# File 'lib/right_support/db/cassandra_model.rb', line 698

def destroy
  self.class.remove(key)
end

#reloadObject

Load object from Cassandra without modifying this object

Return

(CassandraModel)

Object as stored in Cassandra



654
655
656
# File 'lib/right_support/db/cassandra_model.rb', line 654

def reload
  self.class.get(key)
end

#reload!Object

Reload object value from Cassandra and update this object

Return

(CassandraModel)

This object after reload from Cassandra



662
663
664
665
# File 'lib/right_support/db/cassandra_model.rb', line 662

def reload!
  self.attributes = self.class.real_get(key)
  self
end

#saveObject

Store object in Cassandra

Return

true

Always return true



645
646
647
648
# File 'lib/right_support/db/cassandra_model.rb', line 645

def save
  self.class.insert(key, attributes)
  true
end