Class: CassandraModel::RawConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/cassandra_model/raw_connection.rb

Constant Summary collapse

CLUSTER_MUTEX =
Mutex.new
SESSION_MUTEX =
Mutex.new
CONFIG_MUTEX =
Mutex.new
STATEMENT_MUTEX =
Mutex.new
REACTOR_MUTEX =
Mutex.new
DEFAULT_CONFIGURATION =
{
    hosts: %w(localhost),
    keyspace: 'default_keyspace',
    keyspace_options: {
        class: 'SimpleStrategy',
        replication_factor: 1
    },
    port: '9042',
    consistency: :one,
    connection_timeout: 10,
    timeout: 10
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(config_name = nil) ⇒ RawConnection

Returns a new instance of RawConnection.



22
23
24
25
# File 'lib/cassandra_model/raw_connection.rb', line 22

def initialize(config_name = nil)
  @config_name = config_name
  @statement_cache = {}
end

Instance Method Details

#clusterObject



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/cassandra_model/raw_connection.rb', line 35

def cluster
  safe_getset_variable(CLUSTER_MUTEX, :@cluster) do
    connection_configuration = config.slice(:hosts,
                                            :compression,
                                            :consistency,
                                            :connection_timeout, :timeout,
                                            :username, :password,
                                            :address_resolution)
    connection_configuration.merge!(logger: Logging.logger)
    Cassandra.cluster(connection_configuration)
  end
end

#configObject



31
32
33
# File 'lib/cassandra_model/raw_connection.rb', line 31

def config
  safe_getset_variable(CONFIG_MUTEX, :@config) { load_config }
end

#config=(value) ⇒ Object



27
28
29
# File 'lib/cassandra_model/raw_connection.rb', line 27

def config=(value)
  CONFIG_MUTEX.synchronize { @config = DEFAULT_CONFIGURATION.merge(value) }
end

#counter_batch_reactorObject



64
65
66
# File 'lib/cassandra_model/raw_connection.rb', line 64

def counter_batch_reactor
  reactor(:@counter_reactor, SingleTokenCounterBatch)
end

#keyspaceObject



52
53
54
# File 'lib/cassandra_model/raw_connection.rb', line 52

def keyspace
  cluster.keyspace(keyspace_name) || create_keyspace
end

#logged_batch_reactorObject



60
61
62
# File 'lib/cassandra_model/raw_connection.rb', line 60

def logged_batch_reactor
  reactor(:@logged_reactor, SingleTokenLoggedBatch)
end

#sessionObject



48
49
50
# File 'lib/cassandra_model/raw_connection.rb', line 48

def session
  safe_getset_variable(SESSION_MUTEX, :@session) { cluster.connect(config[:keyspace]) }
end

#shutdownObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/cassandra_model/raw_connection.rb', line 74

def shutdown
  @shutdown = true
  REACTOR_MUTEX.synchronize do
    @unlogged_reactor.stop.get if @unlogged_reactor
    @unlogged_reactor = nil

    @logged_reactor.stop.get if @logged_reactor
    @logged_reactor = nil

    @counter_reactor.stop.get if @counter_reactor
    @counter_reactor = nil
  end
  SESSION_MUTEX.synchronize do
    @session.close if @session
    @session = nil
  end
  CLUSTER_MUTEX.synchronize do
    @cluster.close if @cluster
    @cluster = nil
  end
end

#statement(query) ⇒ Object



68
69
70
71
72
# File 'lib/cassandra_model/raw_connection.rb', line 68

def statement(query)
  statement_cache[query] || begin
    STATEMENT_MUTEX.synchronize { statement_cache[query] ||= session.prepare(query) }
  end
end

#unlogged_batch_reactorObject



56
57
58
# File 'lib/cassandra_model/raw_connection.rb', line 56

def unlogged_batch_reactor
  reactor(:@unlogged_reactor, SingleTokenUnloggedBatch)
end