Class: Cassandra

Inherits:
Object
  • Object
show all
Includes:
Columns, Helpers, Protocol
Defined in:
lib/cassandra/cassandra.rb,
lib/cassandra.rb,
lib/cassandra/0.6.rb,
lib/cassandra/0.7.rb,
lib/cassandra/long.rb,
lib/cassandra/columns.rb,
lib/cassandra/helpers.rb,
lib/cassandra/constants.rb,
lib/cassandra/comparable.rb,
lib/cassandra/0.6/columns.rb,
lib/cassandra/0.7/columns.rb,
lib/cassandra/0.6/protocol.rb,
lib/cassandra/0.7/keyspace.rb,
lib/cassandra/0.7/protocol.rb,
lib/cassandra/ordered_hash.rb,
lib/cassandra/0.6/cassandra.rb,
lib/cassandra/0.7/cassandra.rb,
lib/cassandra/0.7/column_family.rb,
lib/cassandra/mock.rb

Overview

OrderedHash is namespaced to prevent conflicts with other implementations

Defined Under Namespace

Modules: Columns, Consistency, Constants, Helpers, Protocol Classes: AccessError, ColumnFamily, Comparable, Keyspace, Long, Mock, OrderedHash, OrderedHashInt

Constant Summary collapse

WRITE_DEFAULTS =
{
  :count => 1000,
  :timestamp => nil,
  :consistency => Consistency::ONE,
  :ttl => nil
}.freeze
READ_DEFAULTS =
{
  :count => 100,
  :start => nil,
  :finish => nil,
  :reversed => false,
  :consistency => Consistency::ONE
}.freeze
THRIFT_DEFAULTS =
{
  :transport_wrapper => Thrift::BufferedTransport,
  :thrift_client_class => ThriftClient
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

#extract_and_validate_params, #s_map

Methods included from Columns

#column_family_property, #column_name_class, #is_super, #keyslice_to_hash, #sub_column_name_class

Constructor Details

#initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {}) ⇒ Cassandra

Create a new Cassandra instance and open the connection.



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/cassandra/cassandra.rb', line 70

def initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {})
  @is_super = {}
  @column_name_class = {}
  @sub_column_name_class = {}
  @auto_discover_nodes = true
  thrift_client_options[:transport_wrapper] ||= Cassandra.DEFAULT_TRANSPORT_WRAPPER
  @thrift_client_options = THRIFT_DEFAULTS.merge(thrift_client_options)
  @thrift_client_class = @thrift_client_options[:thrift_client_class]
  @keyspace = keyspace
  @servers = Array(servers)
end

Instance Attribute Details

#auth_requestObject (readonly)

Returns the value of attribute auth_request.



67
68
69
# File 'lib/cassandra/cassandra.rb', line 67

def auth_request
  @auth_request
end

#keyspaceObject

Returns the value of attribute keyspace.



67
68
69
# File 'lib/cassandra/cassandra.rb', line 67

def keyspace
  @keyspace
end

#schema(load = true) ⇒ Object (readonly)

Returns the value of attribute schema.



67
68
69
# File 'lib/cassandra/cassandra.rb', line 67

def schema
  @schema
end

#serversObject (readonly)

Returns the value of attribute servers.



67
68
69
# File 'lib/cassandra/cassandra.rb', line 67

def servers
  @servers
end

#thrift_client_classObject (readonly)

Returns the value of attribute thrift_client_class.



67
68
69
# File 'lib/cassandra/cassandra.rb', line 67

def thrift_client_class
  @thrift_client_class
end

#thrift_client_optionsObject (readonly)

Returns the value of attribute thrift_client_options.



67
68
69
# File 'lib/cassandra/cassandra.rb', line 67

def thrift_client_options
  @thrift_client_options
end

Class Method Details

.DEFAULT_TRANSPORT_WRAPPERObject



2
3
4
# File 'lib/cassandra/0.6/cassandra.rb', line 2

def self.DEFAULT_TRANSPORT_WRAPPER
  Thrift::BufferedTransport
end

.VERSIONObject



2
3
4
# File 'lib/cassandra/0.6.rb', line 2

def self.VERSION
  "0.6"
end

Instance Method Details

#add_column_family(cf_def) ⇒ Object

Read



73
74
75
76
77
78
79
80
81
# File 'lib/cassandra/0.7/cassandra.rb', line 73

def add_column_family(cf_def)
  begin
    res = client.system_add_column_family(cf_def)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#add_keyspace(ks_def) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
# File 'lib/cassandra/0.7/cassandra.rb', line 113

def add_keyspace(ks_def)
  begin
    res = client.system_add_keyspace(ks_def)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  @keyspaces = nil
  res
end

#batch(options = {}) ⇒ Object

Open a batch operation and yield self. Inserts and deletes will be queued until the block closes, and then sent atomically to the server. Supports the :consistency option, which overrides the consistency set in the individual commands.



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/cassandra/cassandra.rb', line 267

def batch(options = {})
  _, _, _, options = 
    extract_and_validate_params(schema.keys.first, "", [options], WRITE_DEFAULTS)

  @batch = []
  yield(self)
  compact_mutations!

  @batch.each do |mutation|
    case mutation.first
    when :remove
      _remove(*mutation[1])
    else
      _mutate(*mutation)
    end
  end
ensure
  @batch = nil
end

#clear_column_family!(column_family, options = {}) ⇒ Object

Remove all rows in the column family you request. Supports options :consistency and :timestamp. FIXME May not currently delete all records without multiple calls. Waiting for ranged remove support in Cassandra.



12
13
14
15
16
# File 'lib/cassandra/0.6/cassandra.rb', line 12

def clear_column_family!(column_family, options = {})
  each_key(column_family) do |key|
    remove(column_family, key, options)
  end
end

#clear_keyspace!Object

Remove all rows in the keyspace.



22
23
24
# File 'lib/cassandra/0.6/cassandra.rb', line 22

def clear_keyspace!(options = {})
  schema.keys.each { |column_family| clear_column_family!(column_family, options) }
end

#cluster_nameObject



44
45
46
# File 'lib/cassandra/0.7/cassandra.rb', line 44

def cluster_name
  @cluster_name ||= client.describe_cluster_name()
end

#count_columns(column_family, key, *columns_and_options) ⇒ Object

Count the elements at the column_family:key: path you request. Supports the :consistency option.



157
158
159
160
161
# File 'lib/cassandra/cassandra.rb', line 157

def count_columns(column_family, key, *columns_and_options)
  column_family, super_column, _, options = 
    extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS)      
  _count_columns(column_family, key, super_column, options[:consistency])
end

#count_range(column_family, options = {}) ⇒ Object

Count all rows in the column_family you request. Requires the table to be partitioned with OrderPreservingHash. Supports the :start, :finish, and :consistency options.



259
260
261
# File 'lib/cassandra/cassandra.rb', line 259

def count_range(column_family, options = {})
  get_range(column_family, options).select{|r| r.columns.length > 0}.compact.length
end

#create_idx_clause(idx_expressions, start = "") ⇒ Object



228
229
230
231
232
# File 'lib/cassandra/0.7/cassandra.rb', line 228

def create_idx_clause(idx_expressions, start = "")
	CassandraThrift::IndexClause.new(
		:start_key => start,
		:expressions => idx_expressions)
end

#create_idx_expr(c_name, value, op) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/cassandra/0.7/cassandra.rb', line 210

def create_idx_expr(c_name, value, op)
	CassandraThrift::IndexExpression.new(
		:column_name => c_name,
		:value => value,
		:op => (case op
							when nil, "EQ", "eq", "=="
								CassandraThrift::IndexOperator::EQ
							when "GTE", "gte", ">="
								CassandraThrift::IndexOperator::GTE
							when "GT", "gt", ">"
								CassandraThrift::IndexOperator::GT
							when "LTE", "lte", "<="
								CassandraThrift::IndexOperator::LTE
							when "LT", "lt", "<"
								CassandraThrift::IndexOperator::LT
						end ))
end

#create_index(ks_name, cf_name, c_name, v_class) ⇒ Object

2ary Indexing



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/cassandra/0.7/cassandra.rb', line 189

def create_index(ks_name, cf_name, c_name, v_class)
  cf_def = client.describe_keyspace(ks_name).cf_defs.find{|x| x.name == cf_name}
  if !cf_def.nil? and !cf_def..find{|x| x.name == c_name}
    c_def  = CassandraThrift::ColumnDef.new do |cd|
      cd.name             = c_name
      cd.validation_class = "org.apache.cassandra.db.marshal."+v_class
      cd.index_type       = CassandraThrift::IndexType::KEYS
    end
    cf_def..push(c_def)
    update_column_family(cf_def)
  end
end

#disable_node_auto_discovery!Object



82
83
84
# File 'lib/cassandra/cassandra.rb', line 82

def disable_node_auto_discovery!
  @auto_discover_nodes = false
end

#disconnect!Object



86
87
88
89
90
91
# File 'lib/cassandra/cassandra.rb', line 86

def disconnect!
  if @client
    @client.disconnect!
    @client = nil
  end
end

#drop_column_family(cf_name) ⇒ Object



83
84
85
86
87
88
89
90
91
# File 'lib/cassandra/0.7/cassandra.rb', line 83

def drop_column_family(cf_name)
  begin
    res = client.system_drop_column_family(cf_name)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#drop_index(ks_name, cf_name, c_name) ⇒ Object



202
203
204
205
206
207
208
# File 'lib/cassandra/0.7/cassandra.rb', line 202

def drop_index(ks_name, cf_name, c_name)
  cf_def = client.describe_keyspace(ks_name).cf_defs.find{|x| x.name == cf_name}
  if !cf_def.nil? and cf_def..find{|x| x.name == c_name}
    cf_def..delete_if{|x| x.name == c_name}
    update_column_family(cf_def)
  end
end

#drop_keyspace(ks_name) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/cassandra/0.7/cassandra.rb', line 125

def drop_keyspace(ks_name)
  begin
    res = client.system_drop_keyspace(ks_name)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  keyspace = "system" if ks_name.eql?(@keyspace)
  @keyspaces = nil
  res
end

#exists?(column_family, key, *columns_and_options) ⇒ Boolean

Return true if the column_family:key::[sub_column] path you request exists. Supports the :consistency option.

Returns:

  • (Boolean)


211
212
213
214
215
216
217
218
219
220
221
# File 'lib/cassandra/cassandra.rb', line 211

def exists?(column_family, key, *columns_and_options)
  column_family, column, sub_column, options = 
    extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS)
  ret = nil
  if column
    ret = _multiget(column_family, [key], column, sub_column, '', '', 1, false, options[:consistency])[key]
  else
    ret = _multiget(column_family, [key], nil, nil, '', '', 1, false, options[:consistency])[key]
  end
  return (!ret.nil? and ret.send(:length) != 0)
end

#get(column_family, key, *columns_and_options) ⇒ Object

Return a hash (actually, a Cassandra::OrderedHash) or a single value representing the element at the column_family:key::[sub_column] path you request. Supports options :count, :start, :finish, :reversed, and :consistency.



190
191
192
# File 'lib/cassandra/cassandra.rb', line 190

def get(column_family, key, *columns_and_options)
  multi_get(column_family, [key], *columns_and_options)[key]
end

#get_columns(column_family, key, *columns_and_options) ⇒ Object

Return a list of single values for the elements at the column_family:key:column:[sub_columns] path you request. Supports the :consistency option.



173
174
175
176
177
# File 'lib/cassandra/cassandra.rb', line 173

def get_columns(column_family, key, *columns_and_options)
  column_family, columns, sub_columns, options = 
    extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS)      
  _get_columns(column_family, key, columns, sub_columns, options[:consistency])
end

#get_indexed_slices(column_family, idx_clause, *columns_and_options) ⇒ Object

TODO: Supercolumn support.



235
236
237
238
239
240
# File 'lib/cassandra/0.7/cassandra.rb', line 235

def get_indexed_slices(column_family, idx_clause, *columns_and_options)
  column_family, columns, _, options =
    extract_and_validate_params(column_family, [], columns_and_options, READ_DEFAULTS)
  _get_indexed_slices(column_family, idx_clause, columns, options[:count], options[:start],
    options[:finish], options[:reversed], options[:consistency])
end

#get_range(column_family, options = {}) ⇒ Object

Return a list of keys in the column_family you request. Requires the table to be partitioned with OrderPreservingHash. Supports the :count, :start, :finish, and :consistency options.



227
228
229
230
231
232
# File 'lib/cassandra/cassandra.rb', line 227

def get_range(column_family, options = {})
  column_family, _, _, options = 
    extract_and_validate_params(column_family, "", [options], READ_DEFAULTS)
  _get_range(column_family, options[:start].to_s, options[:finish].to_s,
    options[:count], options[:consistency])
end

#get_range_columns(column_family, *columns_and_options) ⇒ Object

Return a list of keys in the column_family you request. Requires the table to be partitioned with OrderPreservingHash. Supports the :count, :start, :finish, and :consistency options.



249
250
251
252
253
254
# File 'lib/cassandra/cassandra.rb', line 249

def get_range_columns(column_family, *columns_and_options)
  column_family, columns, sub_columns, options =
    extract_and_validate_params(column_family, "", columns_and_options, READ_DEFAULTS)
  _get_range_columns(column_family, columns, sub_columns, options[:start].to_s,
    options[:finish].to_s, options[:count], options[:consistency])
end

#get_range_hash(column_family, options = {}) ⇒ Object

Return a list of keys in the column_family you request. Requires the table to be partitioned with OrderPreservingHash. Supports the :count, :start, :finish, and :consistency options.



238
239
240
241
242
243
# File 'lib/cassandra/cassandra.rb', line 238

def get_range_hash(column_family, options = {})
  column_family, _, _, options =
    extract_and_validate_params(column_family, "", [options], READ_DEFAULTS)
  _get_range_hash(column_family, options[:start].to_s, options[:finish].to_s,
    options[:count], options[:consistency])
end

#insert(column_family, key, hash, options = {}) ⇒ Object

Insert a row for a key. Pass a flat hash for a regular column family, and a nested hash for a super column family. Supports the :consistency, :timestamp and :ttl options.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/cassandra/cassandra.rb', line 114

def insert(column_family, key, hash, options = {})
  column_family, _, _, options = extract_and_validate_params(column_family, key, [options], WRITE_DEFAULTS)

  timestamp = options[:timestamp] || Time.stamp
  mutation_map = if is_super(column_family)
    {
      key => {
        column_family => hash.collect{|k,v| _super_insert_mutation(column_family, k, v, timestamp, options[:ttl]) }
      }
    }
  else
    {
      key => {
        column_family => hash.collect{|k,v| _standard_insert_mutation(column_family, k, v, timestamp, options[:ttl])}
      }
    }
  end

  @batch ? @batch << [mutation_map, options[:consistency]] : _mutate(mutation_map, options[:consistency])
end

#inspectObject



103
104
105
106
107
# File 'lib/cassandra/cassandra.rb', line 103

def inspect
  "#<Cassandra:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={#{
    schema(false).map {|name, hash| ":#{name} => #{hash['type'].inspect}"}.join(', ')
  }}, @servers=#{servers.inspect}>"
end

#keyspacesObject



93
94
95
# File 'lib/cassandra/cassandra.rb', line 93

def keyspaces
  @keyspaces ||= client.describe_keyspaces()
end

#login!(username, password) ⇒ Object



97
98
99
100
101
# File 'lib/cassandra/cassandra.rb', line 97

def login!(username, password)
  @auth_request = CassandraThrift::AuthenticationRequest.new
  @auth_request.credentials = {'username' => username, 'password' => password}
  client.(@keyspace, @auth_request)
end

#multi_count_columns(column_family, keys, *options) ⇒ Object

Multi-key version of Cassandra#count_columns. Supports options :count, :start, :finish, :reversed, and :consistency. FIXME Not real multi; needs server support



166
167
168
# File 'lib/cassandra/cassandra.rb', line 166

def multi_count_columns(column_family, keys, *options)
  OrderedHash[*keys.map { |key| [key, count_columns(column_family, key, *options)] }._flatten_once]
end

#multi_get(column_family, keys, *columns_and_options) ⇒ Object

Multi-key version of Cassandra#get. Supports options :count, :start, :finish, :reversed, and :consistency.



196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/cassandra/cassandra.rb', line 196

def multi_get(column_family, keys, *columns_and_options)
  column_family, column, sub_column, options = 
    extract_and_validate_params(column_family, keys, columns_and_options, READ_DEFAULTS)

  hash = _multiget(column_family, keys, column, sub_column, 
    options[:start], options[:finish], options[:count], options[:reversed],
    options[:consistency])
  # Restore order
  ordered_hash = OrderedHash.new
  keys.each { |key| ordered_hash[key] = hash[key] || (OrderedHash.new if is_super(column_family) and !sub_column) }
  ordered_hash
end

#multi_get_columns(column_family, keys, *options) ⇒ Object

Multi-key version of Cassandra#get_columns. Supports the :consistency option. FIXME Not real multi; needs to use a Column predicate



182
183
184
# File 'lib/cassandra/cassandra.rb', line 182

def multi_get_columns(column_family, keys, *options)
  OrderedHash[*keys.map { |key| [key, get_columns(column_family, key, *options)] }._flatten_once]
end

#partitionerObject



52
53
54
# File 'lib/cassandra/0.7/cassandra.rb', line 52

def partitioner
  client.describe_partitioner()
end

#remove(column_family, key, *columns_and_options) ⇒ Object

_mutate the element at the column_family:key::[sub_column] path you request. Supports the :consistency and :timestamp options.



141
142
143
144
145
146
147
148
149
150
151
# File 'lib/cassandra/cassandra.rb', line 141

def remove(column_family, key, *columns_and_options)
  column_family, column, sub_column, options = extract_and_validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS)

  args = {:column_family => column_family}
  columns = is_super(column_family) ? {:super_column => column, :column => sub_column} : {:column => column}
  column_path = CassandraThrift::ColumnPath.new(args.merge(columns))

  mutation = [:remove, [key, column_path, options[:timestamp] || Time.stamp, options[:consistency]]]

  @batch ? @batch << mutation : _remove(*mutation[1])
end

#rename_column_family(old_name, new_name) ⇒ Object



93
94
95
96
97
98
99
100
101
# File 'lib/cassandra/0.7/cassandra.rb', line 93

def rename_column_family(old_name, new_name)
  begin
    res = client.system_rename_column_family(old_name, new_name)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#rename_keyspace(old_name, new_name) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/cassandra/0.7/cassandra.rb', line 138

def rename_keyspace(old_name, new_name)
  begin
    res = client.system_rename_keyspace(old_name, new_name)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  keyspace = new_name if old_name.eql?(@keyspace)
  @keyspaces = nil
  res
end

#ringObject



48
49
50
# File 'lib/cassandra/0.7/cassandra.rb', line 48

def ring
  client.describe_ring(@keyspace)
end

#schema_agreement?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/cassandra/0.7/cassandra.rb', line 36

def schema_agreement?
  client.describe_schema_versions().length == 1
end

#truncate!(column_family) ⇒ Object

Remove all rows in the column family you request.



59
60
61
62
63
64
# File 'lib/cassandra/0.7/cassandra.rb', line 59

def truncate!(column_family)
  #each_key(column_family) do |key|
  #  remove(column_family, key, options)
  #end
  client.truncate(column_family)
end

#update_column_family(cf_def) ⇒ Object



103
104
105
106
107
108
109
110
111
# File 'lib/cassandra/0.7/cassandra.rb', line 103

def update_column_family(cf_def)
  begin
    res = client.system_update_column_family(cf_def)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#update_keyspace(ks_def) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/cassandra/0.7/cassandra.rb', line 151

def update_keyspace(ks_def)
  begin
    res = client.system_update_keyspace(ks_def)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  @keyspaces = nil
  res
end

#versionObject



40
41
42
# File 'lib/cassandra/0.7/cassandra.rb', line 40

def version
  client.describe_version()
end