Module: Ilios::Cassandra

Defined in:
lib/ilios.rb,
ext/ilios/ilios.c

Defined Under Namespace

Classes: ConnectError, ExecutionError, Future, Result, Session, Statement, StatementError

Constant Summary collapse

@@config =

The default options.

{
  keyspace: 'ilios',
  hosts: ['127.0.0.1'],
  timeout_ms: 5_000,
  constant_delay_ms: 15_000,
  max_speculative_executions: 2,
  page_size: 10_000
}

Class Method Summary collapse

Class Method Details

.config=(config) ⇒ Object

Configures the option for connection or execution. The default value will be overridden by the specified option.

Parameters:

  • config (Hash)

    A hash object contained the options.



24
25
26
# File 'lib/ilios.rb', line 24

def self.config=(config)
  @@config = @@config.merge(config)
end

.connectCassandra::Session

Connects a session to the keyspace specified in config method.

Returns:

Raises:

  • (RuntimeError)

    If no host is specified to connect in config method.

  • (Cassandra::ConnectError)

    If the connection fails for any reason.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'ext/ilios/cassandra.c', line 10

static VALUE cassandra_connect(VALUE self)
{
    CassandraSession *cassandra_session;
    VALUE cassandra_session_obj;
    VALUE config;
    VALUE hosts;
    VALUE keyspace;
    char last_error[4096] = { 0 };

    cassandra_session_obj = CREATE_SESSION(cassandra_session);

    cassandra_session->cluster = cass_cluster_new();
    cass_cluster_set_protocol_version(cassandra_session->cluster, CASS_PROTOCOL_VERSION_V4);

    config = rb_cvar_get(self, id_cvar_config);
    cass_cluster_set_request_timeout(cassandra_session->cluster, NUM2UINT(rb_hash_aref(config, sym_timeout_ms)));
    cass_cluster_set_constant_speculative_execution_policy(cassandra_session->cluster, NUM2LONG(rb_hash_aref(config, sym_constant_delay_ms)), NUM2INT(rb_hash_aref(config, sym_max_speculative_executions)));

    keyspace = rb_hash_aref(config, sym_keyspace);
    hosts = rb_hash_aref(config, sym_hosts);

    Check_Type(hosts, T_ARRAY);
    if (RARRAY_LEN(hosts) == 0) {
        rb_raise(rb_eRuntimeError, "No hosts configured");
    }
    if (RARRAY_LEN(hosts) > 1) {
        // To distribute connections
        hosts = rb_funcall(hosts, id_shuffle, 0);
    }

    for (int i = 0; i < RARRAY_LEN(hosts); i++) {
        VALUE host = RARRAY_AREF(hosts, i);

        cass_cluster_set_contact_points(cassandra_session->cluster, ""); // Clear previous contact points
        cass_cluster_set_contact_points(cassandra_session->cluster, StringValueCStr(host));
        cassandra_session->session = cass_session_new();
        cassandra_session->connect_future =  cass_session_connect_keyspace(cassandra_session->session, cassandra_session->cluster, StringValueCStr(keyspace));
        nogvl_future_wait(cassandra_session->connect_future);

        if (cass_future_error_code(cassandra_session->connect_future) == CASS_OK) {
            return cassandra_session_obj;
        }

        strncpy(last_error, cass_error_desc(cass_future_error_code(cassandra_session->connect_future)), sizeof(last_error) - 1);
        cass_future_free(cassandra_session->connect_future);
        cass_session_free(cassandra_session->session);
        cassandra_session->connect_future = NULL;
        cassandra_session->session = NULL;
    }

    cass_cluster_free(cassandra_session->cluster);
    cassandra_session->cluster = NULL;

    rb_raise(eConnectError, "Unable to connect: %s", last_error);
    return Qnil;
}

.sessionCassandra::Session

Connects a session to the keyspace specified in config method. The session object will be memorized by each threads.

Returns:

Raises:

  • (RuntimeError)

    If no host is specified to connect in config method.

  • (Cassandra::ConnectError)

    If the connection fails for any reason.



36
37
38
# File 'lib/ilios.rb', line 36

def self.session
  Thread.current[:ilios_cassandra_session] ||= connect
end