Class: RightSupport::DB::CassandraModel

Inherits:
Object
  • Object
show all
Defined in:
lib/right_support/db/cassandra_model.rb

Overview

Base class for a column family in a keyspace Used to access data persisted in Cassandra Provides wrappers for Cassandra client methods

Constant Summary collapse

DEFAULT_TIMEOUT =

Default timeout for client connection to Cassandra server

10
DEFAULT_COUNT =

Default maximum number of rows to retrieve in one chunk

100
@@logger =
nil
@@current_keyspace =
nil
@@connections =
{}

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(key, attrs = {}) ⇒ CassandraModel

Create column family object

Parameters

key(String)

Primary key for object

attrs(Hash)

Attributes for object which form Cassandra row

with column name as key and column value as value


487
488
489
490
# File 'lib/right_support/db/cassandra_model.rb', line 487

def initialize(key, attrs = {})
  self.key = key
  self.attributes = attrs
end

Class Attribute Details

.column_familyObject

Returns the value of attribute column_family.



129
130
131
# File 'lib/right_support/db/cassandra_model.rb', line 129

def column_family
  @column_family
end

.custom_operation_exceptionObject

Returns the value of attribute custom_operation_exception.



130
131
132
# File 'lib/right_support/db/cassandra_model.rb', line 130

def custom_operation_exception
  @custom_operation_exception
end

.default_keyspaceObject (readonly)

Returns the value of attribute default_keyspace.



128
129
130
# File 'lib/right_support/db/cassandra_model.rb', line 128

def default_keyspace
  @default_keyspace
end

Instance Attribute Details

#attributesObject

self



479
480
481
# File 'lib/right_support/db/cassandra_model.rb', line 479

def attributes
  @attributes
end

#keyObject

self



479
480
481
# File 'lib/right_support/db/cassandra_model.rb', line 479

def key
  @key
end

Class Method Details

.all(k, opt = {}) ⇒ Object

Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Object|nil)

Individual row, or nil if not found, or ordered hash of rows



233
234
235
# File 'lib/right_support/db/cassandra_model.rb', line 233

def all(k, opt = {})
  real_get(k, opt)
end

.batch(*args, &block) ⇒ Object

Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands

Parameters

args(Array)

Batch options such as :consistency

Block

Required block making Cassandra requests

Returns

(Array)

Mutation map and consistency level

Raise

Exception

If block not specified



424
425
426
427
# File 'lib/right_support/db/cassandra_model.rb', line 424

def batch(*args, &block)
  raise "Block required!" unless block_given?
  do_op(:batch, *args, &block)
end

.configObject



136
137
138
# File 'lib/right_support/db/cassandra_model.rb', line 136

def config
  @@config
end

.config=(value) ⇒ Object



140
141
142
# File 'lib/right_support/db/cassandra_model.rb', line 140

def config=(value)
  @@config = value
end

.connObject

Client connected to Cassandra server Create connection if does not already exist Use BinaryProtocolAccelerated if it available

Return

(Cassandra)

Client connected to server



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/right_support/db/cassandra_model.rb', line 207

def conn()
  @@connections ||= {}

  # TODO remove hidden dependency on ENV['RACK_ENV'] (maybe require config= to accept a sub hash?)
  config = @@config[ENV["RACK_ENV"]]
  raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" unless config

  thrift_client_options = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT}
  thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated})\
    if defined? Thrift::BinaryProtocolAccelerated

  @@connections[self.keyspace] ||= Cassandra.new(self.keyspace, config["server"], thrift_client_options)
  @@connections[self.keyspace].disable_node_auto_discovery!
  @@connections[self.keyspace]
end

.do_op(meth, *args, &block) ⇒ Object

Execute Cassandra request Automatically reconnect and retry if IOError encountered

Parameters

meth(Symbol)

Method to be executed

args(Array)

Method arguments

Block

Block if any to be executed by method

Return

(Object)

Value returned by executed method



441
442
443
444
445
446
# File 'lib/right_support/db/cassandra_model.rb', line 441

def do_op(meth, *args, &block)
  conn.send(meth, *args, &block)
rescue IOError
  reconnect
  retry
end

.get(key, opt = {}) ⇒ Object

Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

key(String)

Primary key on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(CassandraModel|nil)

Instantiated object of given class, or nil if not found



247
248
249
250
251
252
253
# File 'lib/right_support/db/cassandra_model.rb', line 247

def get(key, opt = {})
  if (attrs = real_get(key, opt)).empty?
    nil
  else
    new(key, attrs)
  end
end

.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency used

Block

Optional block that is yielded each chunk as it is retrieved as an array like the normally returned result

Return

(OrderedHash)

Rows retrieved with each key, value is columns



305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/right_support/db/cassandra_model.rb', line 305

def get_all_indexed_slices(index, key, columns = nil, opt = {})
  rows = Cassandra::OrderedHash.new
  start = ""
  count = opt.delete(:count) || DEFAULT_COUNT
  expr = do_op(:create_idx_expr, index, key, "EQ")
  opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  while true
    clause = do_op(:create_idx_clause, [expr], start, count)
    chunk = self.conn.get_indexed_slices(column_family, clause, columns, opt)
    rows.merge!(chunk)
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
    # This must be the last chunk
      break
    end
  end
  rows
end

.get_columns(key, columns, opt = {}) ⇒ Object

Get specific columns in row with specified key

Parameters

key(String)

Primary key on which to match

columns(Array)

Names of columns to be retrieved

opt(Hash)

Request options such as :consistency

Return

(Array)

Values of selected columns in the order specified



379
380
381
# File 'lib/right_support/db/cassandra_model.rb', line 379

def get_columns(key, columns, opt = {})
  do_op(:get_columns, column_family, key, columns, sub_columns = nil, opt)
end

.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency used

Return

(Array)

Rows retrieved with each member being an instantiated object of the

given class as value, but object only contains values for the columns retrieved;
array is always empty if a block is given


338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/right_support/db/cassandra_model.rb', line 338

def get_indexed(index, key, columns = nil, opt = {})
  rows = []
  start = ""
  count = DEFAULT_COUNT
  expr = do_op(:create_idx_expr, index, key, "EQ")
  opt = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  loop do
    clause = do_op(:create_idx_clause, [expr], start, count)
    chunk = do_op(:get_indexed_slices, column_family, clause, columns, opt)
    chunk_rows = []
    chunk.each do |row_key, row_columns|
      if row_columns && row_key != start
        attrs = row_columns.inject({}) { |a, c| a[c.column.name] = c.column.value; a }
        chunk_rows << new(row_key, attrs)
      end
    end
    if block_given?
      yield chunk_rows
    else
      rows.concat(chunk_rows)
    end
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
      # This must be the last chunk
      break
    end
  end
  rows
end

.insert(key, values, opt = {}) ⇒ Object

Insert a row for a key

Parameters

key(String)

Primary key for value

values(Hash)

Values to be stored

opt(Hash)

Request options such as :consistency

Return

(Array)

Mutation map and consistency level



392
393
394
# File 'lib/right_support/db/cassandra_model.rb', line 392

def insert(key, values, opt={})
  do_op(:insert, column_family, key, values, opt)
end

.keyspaceObject

Returns current active keyspace.

Return

keyspace(String)

current_keyspace or default_keyspace



166
167
168
# File 'lib/right_support/db/cassandra_model.rb', line 166

def keyspace
  @@current_keyspace || @@default_keyspace
end

.keyspace=(kyspc) ⇒ Object

Sets the default keyspace

Parameters

keyspace(String)

Set the default keyspace



175
176
177
# File 'lib/right_support/db/cassandra_model.rb', line 175

def keyspace=(kyspc)
  @@default_keyspace = (kyspc + "_" + (ENV['RACK_ENV'] || 'development'))
end

.keyspacesObject

Return current keyspaces name as Array of String

Return

(Array)

keyspaces names



157
158
159
# File 'lib/right_support/db/cassandra_model.rb', line 157

def keyspaces
  @@connections.keys
end

.loggerObject



148
149
150
# File 'lib/right_support/db/cassandra_model.rb', line 148

def logger
  @@logger
end

.logger=(l) ⇒ Object



144
145
146
# File 'lib/right_support/db/cassandra_model.rb', line 144

def logger=(l)
  @@logger = l
end

.real_get(k, opt = {}) ⇒ Object

Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Cassandra::OrderedHash)

Individual row or OrderedHash of rows



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/right_support/db/cassandra_model.rb', line 267

def real_get(k, opt = {})
  if k.is_a?(Array)
    do_op(:multi_get, column_family, k, opt)
  elsif opt[:count]
    do_op(:get, column_family, k, opt)
  else
    opt = opt.clone
    opt[:count] = DEFAULT_COUNT
    columns = Cassandra::OrderedHash.new
    loop do
      chunk = do_op(:get, column_family, k, opt)
      columns.merge!(chunk)
      if chunk.size == opt[:count]
        # Assume there are more chunks, use last key as start of next get
        opt[:start] = chunk.keys.last
      else
        # This must be the last chunk
        break
      end
    end
    columns
  end
end

.reconnectObject

Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available

Return

true

Always return true



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/right_support/db/cassandra_model.rb', line 453

def reconnect
  config = @@config[ENV["RACK_ENV"]]
  raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" unless config

  return false if keyspace.nil?

  thrift_client_options = {:timeout => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT}
  thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated})\
    if defined? Thrift::BinaryProtocolAccelerated

  connection = Cassandra.new(keyspace, config["server"], thrift_client_options)
  connection.disable_node_auto_discovery!
  @@connections[keyspace] = connection
  true
end

.remove(*args) ⇒ Object

Delete row or columns of row

Parameters

args(Array)

Key, columns, options

Return

(Array)

Mutation map and consistency level



403
404
405
# File 'lib/right_support/db/cassandra_model.rb', line 403

def remove(*args)
  do_op(:remove, column_family, *args)
end

.ringObject

Cassandra ring

Return

(Array)

Members of ring



473
474
475
# File 'lib/right_support/db/cassandra_model.rb', line 473

def ring
  conn.ring
end

.with_keyspace(kyspc, &block) ⇒ Object

Execute given block in kyspc context

Parameters

kyspc(String)

Keyspace context

block(Proc)

Code that will be called in keyspace context



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/right_support/db/cassandra_model.rb', line 185

def with_keyspace(kyspc, &block)
  @@current_keyspace = (kyspc + "_" + (ENV['RACK_ENV'] || 'development'))
  begin
    block.call
  rescue Exception => e
    if !self.custom_operation_exception.nil? && self.custom_operation_exception.kind_of?(Proc)\
          && e.kind_of?(Thrift::Exception)
      custom_operation_exception.call
    else
      raise e
    end
  ensure
    @@current_keyspace = nil
  end
end

Instance Method Details

#[](key) ⇒ Object

Column value

Parameters

key(String|Integer)

Column name or key

Return

(Object|nil)

Column value, or nil if not found



525
526
527
528
529
530
531
# File 'lib/right_support/db/cassandra_model.rb', line 525

def [](key)
  ret = attributes[key]
  return ret if ret
  if key.kind_of? Integer
    return attributes[Cassandra::Long.new(key)]
  end
end

#[]=(key, value) ⇒ Object

Store new column value

Parameters

key(String|Integer)

Column name or key

value(Object)

Value to be stored

Return

(Object)

Value stored



541
542
543
# File 'lib/right_support/db/cassandra_model.rb', line 541

def []=(key, value)
  attributes[key] = value
end

#destroyObject

Delete object from Cassandra

Return

true

Always return true



549
550
551
# File 'lib/right_support/db/cassandra_model.rb', line 549

def destroy
  self.class.remove(key)
end

#reloadObject

Load object from Cassandra without modifying this object

Return

(CassandraModel)

Object as stored in Cassandra



505
506
507
# File 'lib/right_support/db/cassandra_model.rb', line 505

def reload
  self.class.get(key)
end

#reload!Object

Reload object value from Cassandra and update this object

Return

(CassandraModel)

This object after reload from Cassandra



513
514
515
516
# File 'lib/right_support/db/cassandra_model.rb', line 513

def reload!
  self.attributes = self.class.real_get(key)
  self
end

#saveObject

Store object in Cassandra

Return

true

Always return true



496
497
498
499
# File 'lib/right_support/db/cassandra_model.rb', line 496

def save
  self.class.insert(key, attributes)
  true
end