Module: Ilios::Cassandra
- Defined in:
- lib/ilios.rb,
ext/ilios/ilios.c
Defined Under Namespace
Classes: ConnectError, ExecutionError, Future, Result, Session, Statement, StatementError
Constant Summary collapse
- @@config =
The default options.
{ keyspace: 'ilios', hosts: ['127.0.0.1'], timeout_ms: 5_000, constant_delay_ms: 15_000, max_speculative_executions: 2, page_size: 10_000 }
Class Method Summary collapse
-
.config=(config) ⇒ Object
Configures the option for connection or execution.
-
.connect ⇒ Cassandra::Session
Connects a session to the keyspace specified in
config
method. -
.session ⇒ Cassandra::Session
Connects a session to the keyspace specified in
config
method.
Class Method Details
.config=(config) ⇒ Object
Configures the option for connection or execution. The default value will be overridden by the specified option.
24 25 26 |
# File 'lib/ilios.rb', line 24 def self.config=(config) @@config = @@config.merge(config) end |
.connect ⇒ Cassandra::Session
Connects a session to the keyspace specified in config
method.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'ext/ilios/cassandra.c', line 10
static VALUE cassandra_connect(VALUE self)
{
CassandraSession *cassandra_session;
VALUE cassandra_session_obj;
VALUE config;
VALUE hosts;
VALUE keyspace;
char last_error[4096] = { 0 };
cassandra_session_obj = CREATE_SESSION(cassandra_session);
cassandra_session->cluster = cass_cluster_new();
cass_cluster_set_protocol_version(cassandra_session->cluster, CASS_PROTOCOL_VERSION_V4);
config = rb_cvar_get(self, id_cvar_config);
cass_cluster_set_request_timeout(cassandra_session->cluster, NUM2UINT(rb_hash_aref(config, sym_timeout_ms)));
cass_cluster_set_constant_speculative_execution_policy(cassandra_session->cluster, NUM2LONG(rb_hash_aref(config, sym_constant_delay_ms)), NUM2INT(rb_hash_aref(config, sym_max_speculative_executions)));
keyspace = rb_hash_aref(config, sym_keyspace);
hosts = rb_hash_aref(config, sym_hosts);
Check_Type(hosts, T_ARRAY);
if (RARRAY_LEN(hosts) == 0) {
rb_raise(rb_eRuntimeError, "No hosts configured");
}
if (RARRAY_LEN(hosts) > 1) {
// To distribute connections
hosts = rb_funcall(hosts, id_shuffle, 0);
}
for (int i = 0; i < RARRAY_LEN(hosts); i++) {
VALUE host = RARRAY_AREF(hosts, i);
cass_cluster_set_contact_points(cassandra_session->cluster, ""); // Clear previous contact points
cass_cluster_set_contact_points(cassandra_session->cluster, StringValueCStr(host));
cassandra_session->session = cass_session_new();
cassandra_session->connect_future = cass_session_connect_keyspace(cassandra_session->session, cassandra_session->cluster, StringValueCStr(keyspace));
nogvl_future_wait(cassandra_session->connect_future);
if (cass_future_error_code(cassandra_session->connect_future) == CASS_OK) {
return cassandra_session_obj;
}
strncpy(last_error, cass_error_desc(cass_future_error_code(cassandra_session->connect_future)), sizeof(last_error) - 1);
cass_future_free(cassandra_session->connect_future);
cass_session_free(cassandra_session->session);
cassandra_session->connect_future = NULL;
cassandra_session->session = NULL;
}
cass_cluster_free(cassandra_session->cluster);
cassandra_session->cluster = NULL;
rb_raise(eConnectError, "Unable to connect: %s", last_error);
return Qnil;
}
|
.session ⇒ Cassandra::Session
Connects a session to the keyspace specified in config
method. The session object will be memorized by each threads.
36 37 38 |
# File 'lib/ilios.rb', line 36 def self.session Thread.current[:ilios_cassandra_session] ||= connect end |