Class: CassandraObject::Adapters::CassandraDriver::Client

Inherits:
ActiveRecord::ConnectionAdapters::AbstractAdapter
  • Object
show all
Defined in:
lib/cassandra_object/adapters/cassandra_driver.rb

Overview

The client class acts like the old cassandra gem

Constant Summary collapse

KEY_FIELD =
'key'
NAME_FIELD =
'column1'
VALUE_FIELD =
'value'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, cluster) ⇒ Client

Returns a new instance of Client.



75
76
77
78
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 75

def initialize(session, cluster)
  @session = session
  @cluster = cluster
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



69
70
71
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 69

def cluster
  @cluster
end

#sessionObject (readonly)

Returns the value of attribute session.



69
70
71
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 69

def session
  @session
end

Instance Method Details

#add(column_family, key, by, fields, opts = nil) ⇒ Object



180
181
182
183
184
185
186
187
188
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 180

def add(column_family, key, by, fields, opts=nil)
  async = opts.try(:[], :async)
  fields = [fields] unless fields.is_a?(Array)

  fields.each do |field|
    query = "UPDATE \"#{column_family}\" SET #{VALUE_FIELD} = #{VALUE_FIELD} + #{by} WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, field)};"
    async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
  end
end

#add_column_family(column_family) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 313

def add_column_family(column_family)
  value_type = column_family.default_validation_class == 'CounterColumnType' ? 'counter' : 'text'

  query = <<-CQL
CREATE TABLE "#{column_family.name}" (
  key blob,
  column1 text,
  value #{value_type},
  PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE
  AND CLUSTERING ORDER BY (column1 ASC)
  AND bloom_filter_fp_chance = 0.1
  AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
  AND comment = ''
  AND compaction = {'sstable_size_in_mb': '160', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
  AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.SnappyCompressor'}
  AND dclocal_read_repair_chance = 0.1
  AND default_time_to_live = 0
  AND gc_grace_seconds = 864000
  AND max_index_interval = 2048
  AND memtable_flush_period_in_ms = 0
  AND min_index_interval = 128
  AND read_repair_chance = 0.0
  AND speculative_retry = 'NONE';
CQL

  self.execute(query)
  self.schema(true)
end

#add_multiple_columns(column_family, key, hash, opts = nil) ⇒ Object



190
191
192
193
194
195
196
197
198
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 190

def add_multiple_columns(column_family, key, hash, opts=nil)
  async = opts.try(:[], :async)
  fields = [fields] unless fields.is_a?(Array)

  hash.each do |field, by|
    query = "UPDATE \"#{column_family}\" SET #{VALUE_FIELD} = #{VALUE_FIELD} + #{by} WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, field)};"
    async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
  end
end

#batch(opts = false) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 285

def batch(opts=false)
  if @batched_queries
    yield
  else
    begin
      async = opts.try(:[], :async)
      @batched_queries ||= []
      yield
      query = "BEGIN BATCH\n"
      query << @batched_queries.join("\n")
      query << "\nAPPLY BATCH;"
      async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
    rescue Exception => e
      raise e
    ensure
      @batched_queries = nil
    end
  end
end

#batch_mode?Boolean

Returns:

  • (Boolean)


281
282
283
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 281

def batch_mode?
  !! @batched_queries
end

#clause(column_family, val, fields, operator = '=') ⇒ Object



390
391
392
393
394
395
396
397
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 390

def clause(column_family, val, fields, operator='=')
  if operator == '='
    get_parts(column_family, val, fields).map { |field, val, type| "#{field} #{operator} #{escape(val, type)}" }.join(' AND ')
  else
    field, val, type = get_parts(column_family, val, fields).first
    "#{field} #{operator} #{escape(val, type)}"
  end
end

#closeObject



80
81
82
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 80

def close
  session.close
end

#column_clause(column_family, val, operator = '=') ⇒ Object



386
387
388
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 386

def column_clause(column_family, val, operator='=')
  clause(column_family, val, get_column_fields(column_family), operator)
end

#column_familiesObject



277
278
279
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 277

def column_families
  @column_families ||= self.cluster.keyspace(session.keyspace).tables.inject({}) { |hsh, table| hsh[table.name] = table; hsh }
end

#column_string(row, fields) ⇒ Object



444
445
446
447
448
449
450
451
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 444

def column_string(row, fields)
  if fields.size > 1
    parts = fields.map { |f, t| decode(row[f], t) }
    Cassandra::Composite.new(*parts).to_s
  else
    row[NAME_FIELD]
  end
end

#convert_str_to_hex(str) ⇒ Object

insert a blob



440
441
442
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 440

def convert_str_to_hex(str)
  '0x' << str.unpack('H*').first
end

#convert_str_to_timeuuid(str) ⇒ Object

when the column names are timeuuid



435
436
437
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 435

def convert_str_to_timeuuid(str)
  SimpleUUID::UUID.new(str).to_guid
end

#decode(val, type) ⇒ Object



425
426
427
428
429
430
431
432
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 425

def decode(val, type)
  case type
  when :timeuuid
    SimpleUUID::UUID.new(val.to_s).to_s # binary version
  else
    val
  end
end

#escape(str, type) ⇒ Object



399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 399

def escape(str, type)
  case type
  when :timeuuid
    convert_str_to_timeuuid str
  when :blob
    convert_str_to_hex str
  when :int, :bigint
    str
  else
    "'#{str}'"
  end
end

#execute(query, options = {}) ⇒ Object



84
85
86
87
88
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 84

def execute(query, options={})
  ActiveSupport::Notifications.instrument('query.cassandra', query: query, options: options, async: false) do
    session.execute query, options
  end
end

#execute_async(query, options = {}) ⇒ Object



90
91
92
93
94
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 90

def execute_async(query, options={})
  ActiveSupport::Notifications.instrument('query.cassandra', query: query, options: options, async: true) do
    session.execute_async query, options
  end
end

#execute_options(opts) ⇒ Object



255
256
257
258
259
260
261
262
263
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 255

def execute_options(opts)
  opts.try(:slice,
           :consistency,
           :page_size,
           :trace,
           :timeout,
           :serial_consistency
          ) || {}
end

#get(column_family, key, *columns_options) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 121

def get(column_family, key, *columns_options)
  opts = columns_options.pop if columns_options.last.is_a?(Hash)
  async = opts.try(:[], :async)

  columns = columns_options.flatten.compact
  column_fields = get_column_fields(column_family)

  query =
    if columns.size == 1
      "SELECT #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, columns.first)}"
    else
      "SELECT #{column_fields.map(&:first).join(', ')}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)}"
    end

  result = async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
  return result if async

  if columns.size == 1
    result.size > 0 ? result.first[VALUE_FIELD] : nil
  else
    data = result.inject({}) { |hsh, row| hsh[column_string(row, column_fields)] = row[VALUE_FIELD]; hsh }
    columns.size > 0 ? data.slice(*columns.map(&:to_s)) : data
  end
end

#get_column_fields(column_family) ⇒ Object



363
364
365
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 363

def get_column_fields(column_family)
  cluster.keyspace(keyspace).table(column_family).columns.select { |c| c.name =~ /^column/ }.map { |c| [c.name, c.type.kind] }
end

#get_columns(column_family, key, columns, opts) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 146

def get_columns(column_family, key, columns, opts)
  async = opts.try(:[], :async)

  name_fields = columns.map { |c| escape(c, name_type(column_family)) }.join(', ')

  query = "SELECT #{NAME_FIELD}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{NAME_FIELD} IN(#{name_fields}) AND #{key_clause(column_family, key)};"

  result = async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
  return result if async

  data = result
         .inject({}) { |hsh, row| hsh[row[NAME_FIELD]] = row[VALUE_FIELD]; hsh }

  columns.map { |column| data[column.to_s] }
end

#get_columns_as_hash(column_family, key, columns, opts) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 162

def get_columns_as_hash(column_family, key, columns, opts)
  async = opts.try(:[], :async)

  name_fields = columns.map { |c| "'#{c}'" }.join(', ')

  query = "SELECT #{NAME_FIELD}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{NAME_FIELD} IN(#{name_fields}) AND #{key_clause(column_family, key)};"

  result = async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
  return result if async

  result
    .inject({}) { |hsh, row| hsh[row[NAME_FIELD]] = row[VALUE_FIELD]; hsh }
end

#get_key_fields(column_family) ⇒ Object



359
360
361
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 359

def get_key_fields(column_family)
  cluster.keyspace(keyspace).table(column_family).columns.select { |c| c.name =~ /^key/ }.map { |c| [c.name, c.type.kind] }
end

#get_parts(column_family, val, fields) ⇒ Object



371
372
373
374
375
376
377
378
379
380
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 371

def get_parts(column_family, val, fields)
  val =
    if fields.size > 1
      Cassandra::Composite.new_from_packed(val.dup).parts
    else
      [val]
    end

  fields.each_with_index.map { |(field, type), idx| [field, normalize_composite_key_part(val[idx], type), type] }
end

#get_range(column_family, opts = {}, &blk) ⇒ Object



214
215
216
217
218
219
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 214

def get_range(column_family, opts={}, &blk)
  key_count = opts[:key_count] || 100
  query = "SELECT #{KEY_FIELD} FROM \"#{column_family}\" LIMIT #{key_count}"
  keys = self.execute(query, execute_options(opts)).map { |result| result[KEY_FIELD] }
  keys.size > 0 ? multi_get(column_family, keys) : {}
end

#get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts = {}) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 233

def get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts={})
  opts[:consistency] = consistency

  column_fields = get_column_fields(column_family)

  query = "SELECT #{column_fields.map(&:first).join(', ')}, #{VALUE_FIELD} FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)}"
  query << " AND #{column_clause(column_family, column)}" if column
  query << " AND #{column_clause(column_family, start, '>=')}" unless start.empty?
  query << " AND #{column_clause(column_family, finish, '<=')}" unless finish.empty?
  if reversed
    direction = reverse_comparator(column_family) ? 'ASC' : 'DESC'
    query << " ORDER BY "
    query << column_fields.map { |f, _| "#{f} #{direction}" }.join(', ')
  end
  query << " LIMIT #{count}"

  self.execute(query, execute_options(opts)).inject({}) do |results, row|
    results[column_string(row, column_fields)] = decode(row[VALUE_FIELD], value_type(column_family))
    results
  end
end

#get_type(column_family, field) ⇒ Object



355
356
357
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 355

def get_type(column_family, field)
  cluster.keyspace(keyspace).table(column_family).column(field).type.kind
end

#get_value(column_family, key, column, consistency) ⇒ Object



176
177
178
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 176

def get_value(column_family, key, column, consistency)
  get column_family, key, [column], :consistency => consistency
end

#has_table?(name) ⇒ Boolean

Returns:

  • (Boolean)


269
270
271
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 269

def has_table?(name)
  self.cluster.keyspace(session.keyspace).has_table? name
end

#insert(column_family, key, values, opts = nil) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 96

def insert(column_family, key, values, opts=nil)
  ttl = opts.try(:[], :ttl)
  async = opts.try(:[], :async)

  insert_into_options = ttl ? " USING TTL #{ttl}" : ''

  key_fields = get_key_fields(column_family)
  column_fields = get_column_fields(column_family)

  key_parts = get_parts(column_family, key, key_fields)
  insert_query = values.map do |name, value|
    column_parts = get_parts(column_family, name, column_fields)
    "  INSERT INTO \"#{column_family}\" (#{key_fields.map(&:first).join(', ')}, #{column_fields.map(&:first).join(', ')}, #{VALUE_FIELD}) VALUES (#{key_parts.map { |f, v| escape(v, get_type(column_family, f)) }.join(', ')}, #{column_parts.map { |f, v| escape(v, get_type(column_family, f)) }.join(', ')}, #{escape(value, value_type(column_family))})#{insert_into_options}"
  end.join("\n")

  if batch_mode?
    @batched_queries << insert_query
  else
    query = "BEGIN BATCH\n"
    query << insert_query
    query << "\nAPPLY BATCH;"
    async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
  end
end

#key_clause(column_family, val) ⇒ Object



382
383
384
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 382

def key_clause(column_family, val)
  clause(column_family, val, get_key_fields(column_family))
end

#key_type(column_family) ⇒ Object



343
344
345
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 343

def key_type(column_family)
  get_type(column_family, KEY_FIELD)
end

#keyspaceObject



265
266
267
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 265

def keyspace
  session.keyspace
end

#multi_get(column_family, keys, *args) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 221

def multi_get(column_family, keys, *args)
  opts = args.pop if args.last.is_a?(Hash)
  keys = keys.map { |key| escape(key, key_type(column_family)) }.join(',')
  results = {}
  query = "SELECT * FROM \"#{column_family}\" WHERE #{KEY_FIELD} IN(#{keys})"
  self.execute(query, execute_options(opts)).each do |row|
    results[row[KEY_FIELD]] ||= {}
    results[row[KEY_FIELD]][row[NAME_FIELD]] = row[VALUE_FIELD]
  end
  results
end

#name_type(column_family) ⇒ Object



347
348
349
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 347

def name_type(column_family)
  get_type(column_family, NAME_FIELD)
end

#normalize_composite_key_part(val, type) ⇒ Object



412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 412

def normalize_composite_key_part(val, type)
  case type
  when :timeuuid
    SimpleUUID::UUID.new(val).to_s
  when :int
    val.unpack('N').first
  when :bigint
    Cassandra::Long.new(val).to_i
  else
    val
  end
end

#remove(column_family, key, *args) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 200

def remove(column_family, key, *args)
  opts = args.pop if args.last.is_a?(Hash)
  async = opts.try(:[], :async)

  query =
    if args.first.nil? || args.first.is_a?(Hash)
      "DELETE FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)};"
    else
      "DELETE FROM \"#{column_family}\" WHERE #{key_clause(column_family, key)} AND #{column_clause(column_family, args.first)};"
    end

  async ? self.execute_async(query, execute_options(opts)) : self.execute(query, execute_options(opts))
end

#reverse_comparator(column_family) ⇒ Object



367
368
369
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 367

def reverse_comparator(column_family)
  self.cluster.keyspace(keyspace).table(column_family).send(:clustering_order).first == :desc
end

#schema(reload = false) ⇒ Object



305
306
307
308
309
310
311
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 305

def schema(reload=false)
  if reload
    remove_instance_variable(:@schema_cache) if instance_variable_defined?(:@schema_cache)
    remove_instance_variable(:@column_families) if instance_variable_defined?(:@column_families)
    self.cluster.refresh_schema
  end
end

#schema_cacheObject



273
274
275
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 273

def schema_cache
  @schema_cache ||= SchemaCache.new(self)
end

#value_type(column_family) ⇒ Object



351
352
353
# File 'lib/cassandra_object/adapters/cassandra_driver.rb', line 351

def value_type(column_family)
  get_type(column_family, VALUE_FIELD)
end