Class: NoSE::Backend::CassandraBackend

Inherits:
Backend show all
Includes:
Subtype
Defined in:
lib/nose/backend/cassandra.rb

Overview

A backend which communicates with Cassandra via CQL

Defined Under Namespace

Classes: DeleteStatementStep, IndexLookupStatementStep, InsertStatementStep

Instance Method Summary collapse

Methods included from Subtype

included

Methods inherited from Backend

#by_id_graph, #indexes_sample, #prepare, #prepare_query, #prepare_update, #query, #update

Methods included from Supertype

included

Methods included from Listing

included

Constructor Details

#initialize(model, indexes, plans, update_plans, config) ⇒ CassandraBackend

Returns a new instance of CassandraBackend.



13
14
15
16
17
18
19
20
# File 'lib/nose/backend/cassandra.rb', line 13

def initialize(model, indexes, plans, update_plans, config)
  super

  @hosts = config[:hosts]
  @port = config[:port]
  @keyspace = config[:keyspace]
  @generator = Cassandra::Uuid::Generator.new
end

Instance Method Details

#drop_index(index) ⇒ Object

Check if a given index exists in the target database



86
87
88
# File 'lib/nose/backend/cassandra.rb', line 86

def drop_index(index)
  client.execute "DROP TABLE \"#{index.key}\""
end

#generate_idObject

Generate a random UUID



23
24
25
# File 'lib/nose/backend/cassandra.rb', line 23

def generate_id
  @generator.uuid
end

#index_empty?(index) ⇒ Boolean

Check if the given index is empty

Returns:

  • (Boolean)


74
75
76
77
# File 'lib/nose/backend/cassandra.rb', line 74

def index_empty?(index)
  query = "SELECT COUNT(*) FROM \"#{index.key}\" LIMIT 1"
  client.execute(query).first.values.first.zero?
end

#index_exists?(index) ⇒ Boolean

Check if a given index exists in the target database

Returns:

  • (Boolean)


80
81
82
83
# File 'lib/nose/backend/cassandra.rb', line 80

def index_exists?(index)
  client
  @cluster.keyspace(@keyspace).has_table? index.key
end

#index_insert_chunk(index, chunk) ⇒ Array<Array<Cassandra::Uuid>>

Insert a chunk of rows into an index

Returns:



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/nose/backend/cassandra.rb', line 52

def index_insert_chunk(index, chunk)
  fields = index.all_fields.to_a
  prepared = "INSERT INTO \"#{index.key}\" (" \
             "#{field_names fields}" \
             ") VALUES (#{(['?'] * fields.length).join ', '})"
  prepared = client.prepare prepared

  ids = []
  client.execute(client.batch do |batch|
    chunk.each do |row|
      index_row = index_row(row, fields)
      ids << (index.hash_fields.to_a + index.order_fields).map do |field|
        index_row[fields.index field]
      end
      batch.add prepared, arguments: index_row
    end
  end)

  ids
end

#index_sample(index, count) ⇒ Object

Sample a number of values from the given index



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/nose/backend/cassandra.rb', line 91

def index_sample(index, count)
  field_list = index.all_fields.map { |f| "\"#{f.id}\"" }
  query = "SELECT #{field_list.join ', '} " \
          "FROM \"#{index.key}\" LIMIT #{count}"
  rows = client.execute(query).rows

  # XXX Ignore null values for now
  # fail if rows.any? { |row| row.values.any?(&:nil?) }

  rows
end

#indexes_ddl(execute = false, skip_existing = false, drop_existing = false) ⇒ Object

Produce the DDL necessary for column families for the given indexes and optionally execute them against the server



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/nose/backend/cassandra.rb', line 29

def indexes_ddl(execute = false, skip_existing = false,
                drop_existing = false)
  Enumerator.new do |enum|
    @indexes.map do |index|
      ddl = index_cql index
      enum.yield ddl

      begin
        drop_index(index) if drop_existing && index_exists?(index)
        client.execute(ddl) if execute
      rescue Cassandra::Errors::AlreadyExistsError => exc
        next if skip_existing

        new_exc = IndexAlreadyExists.new exc.message
        new_exc.set_backtrace exc.backtrace
        raise new_exc
      end
    end
  end
end