Class: NoSE::Backend::CassandraBackend

Inherits:
Backend show all
Includes:
Subtype
Defined in:
lib/nose/backend/cassandra.rb

Overview

A backend which communicates with Cassandra via CQL

Defined Under Namespace

Classes: DeleteStatementStep, IndexLookupStatementStep, InsertStatementStep

Instance Method Summary collapse

Methods included from Subtype

included

Methods inherited from Backend

#by_id_graph, #indexes_sample, #prepare, #prepare_query, #prepare_update, #query, #update

Methods included from Supertype

included

Methods included from Listing

included

Constructor Details

#initialize(model, indexes, plans, update_plans, config) ⇒ CassandraBackend

Returns a new instance of CassandraBackend.



12
13
14
15
16
17
18
19
# File 'lib/nose/backend/cassandra.rb', line 12

def initialize(model, indexes, plans, update_plans, config)
  super

  @hosts = config[:hosts]
  @port = config[:port]
  @keyspace = config[:keyspace]
  @generator = Cassandra::Uuid::Generator.new
end

Instance Method Details

#drop_index(index) ⇒ Object

Check if a given index exists in the target database



85
86
87
# File 'lib/nose/backend/cassandra.rb', line 85

def drop_index(index)
  client.execute "DROP TABLE \"#{index.key}\""
end

#generate_idObject

Generate a random UUID



22
23
24
# File 'lib/nose/backend/cassandra.rb', line 22

def generate_id
  @generator.uuid
end

#index_empty?(index) ⇒ Boolean

Check if the given index is empty

Returns:

  • (Boolean)


73
74
75
76
# File 'lib/nose/backend/cassandra.rb', line 73

def index_empty?(index)
  query = "SELECT COUNT(*) FROM \"#{index.key}\" LIMIT 1"
  client.execute(query).first.values.first.zero?
end

#index_exists?(index) ⇒ Boolean

Check if a given index exists in the target database

Returns:

  • (Boolean)


79
80
81
82
# File 'lib/nose/backend/cassandra.rb', line 79

def index_exists?(index)
  client
  @cluster.keyspace(@keyspace).has_table? index.key
end

#index_insert_chunk(index, chunk) ⇒ Array<Array<Cassandra::Uuid>>

Insert a chunk of rows into an index

Returns:



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/nose/backend/cassandra.rb', line 51

def index_insert_chunk(index, chunk)
  fields = index.all_fields.to_a
  prepared = "INSERT INTO \"#{index.key}\" (" \
             "#{field_names fields}" \
             ") VALUES (#{(['?'] * fields.length).join ', '})"
  prepared = client.prepare prepared

  ids = []
  client.execute(client.batch do |batch|
    chunk.each do |row|
      index_row = index_row(row, fields)
      ids << (index.hash_fields.to_a + index.order_fields).map do |field|
        index_row[fields.index field]
      end
      batch.add prepared, arguments: index_row
    end
  end)

  ids
end

#index_sample(index, count) ⇒ Object

Sample a number of values from the given index



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/nose/backend/cassandra.rb', line 90

def index_sample(index, count)
  field_list = index.all_fields.map { |f| "\"#{f.id}\"" }
  query = "SELECT #{field_list.join ', '} " \
          "FROM \"#{index.key}\" LIMIT #{count}"
  rows = client.execute(query).rows

  # XXX Ignore null values for now
  # fail if rows.any? { |row| row.values.any?(&:nil?) }

  rows
end

#indexes_ddl(execute = false, skip_existing = false, drop_existing = false) ⇒ Object

Produce the DDL necessary for column families for the given indexes and optionally execute them against the server



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/nose/backend/cassandra.rb', line 28

def indexes_ddl(execute = false, skip_existing = false,
                drop_existing = false)
  Enumerator.new do |enum|
    @indexes.map do |index|
      ddl = index_cql index
      enum.yield ddl

      begin
        drop_index(index) if drop_existing && index_exists?(index)
        client.execute(ddl) if execute
      rescue Cassandra::Errors::AlreadyExistsError => exc
        next if skip_existing

        new_exc = IndexAlreadyExists.new exc.message
        new_exc.set_backtrace exc.backtrace
        raise new_exc
      end
    end
  end
end