Class: Cassie
- Inherits:
-
Object
- Object
- Cassie
- Defined in:
- lib/cassie.rb
Overview
This class provides a lightweight wrapper around the Cassandra driver. It provides a foundation for maintaining a connection and constructing CQL statements.
Defined Under Namespace
Modules: Model, Testing Classes: Config, Railtie, RecordInvalid, RecordNotFound, Schema
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Class Method Summary collapse
-
.configure!(options) ⇒ Object
Call this method to load the Cassie::Config from the specified file for the specified environment.
-
.consistency(level) ⇒ Object
This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them.
-
.instance ⇒ Object
A singleton instance that can be shared to communicate with a Cassandra cluster.
-
.logger ⇒ Object
Get a Logger compatible object if it has been set.
-
.logger=(value) ⇒ Object
Set a logger with a Logger compatible object.
Instance Method Summary collapse
-
#batch(options = nil) ⇒ Object
Declare and execute a batch statement.
-
#connect ⇒ Object
Open a connection to the Cassandra cluster.
-
#connected? ⇒ Boolean
Return true if the connection to the Cassandra cluster has been established.
-
#delete(table, key_hash, options = nil) ⇒ Object
Delete a row from a table.
-
#disconnect ⇒ Object
Close the connections to the Cassandra cluster.
-
#execute(cql, values = nil, options = nil) ⇒ Object
Execute an arbitrary CQL statment.
-
#find(cql, values = nil, options = nil) ⇒ Object
Find rows using the CQL statement.
-
#initialize(config) ⇒ Cassie
constructor
A new instance of Cassie.
-
#insert(table, values_hash, options = nil) ⇒ Object
Insert a row from a hash into a table.
-
#prepare(cql) ⇒ Object
Prepare a CQL statement for repeate execution.
-
#reconnect ⇒ Object
Force reconnection.
-
#update(table, values_hash, key_hash, options = nil) ⇒ Object
Update a row in a table.
Constructor Details
#initialize(config) ⇒ Cassie
Returns a new instance of Cassie.
72 73 74 75 76 77 78 |
# File 'lib/cassie.rb', line 72 def initialize(config) @config = config @monitor = Monitor.new @session = nil @prepared_statements = {} @last_prepare_warning = Time.now end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
24 25 26 |
# File 'lib/cassie.rb', line 24 def config @config end |
Class Method Details
.configure!(options) ⇒ Object
Call this method to load the Cassie::Config from the specified file for the specified environment.
38 39 40 41 42 43 44 45 |
# File 'lib/cassie.rb', line 38 def configure!() if defined?(@instance) && @instance old_instance = @instance @instance = nil old_instance.disconnect end @config = Cassie::Config.new() end |
.consistency(level) ⇒ Object
This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them. It can be used where consistency is important (i.e. on validation queries) but where a higher level method doesn’t provide an option to set it.
51 52 53 54 55 56 57 58 59 |
# File 'lib/cassie.rb', line 51 def consistency(level) save_val = Thread.current[:cassie_consistency] begin Thread.current[:cassie_consistency] = level yield ensure Thread.current[:cassie_consistency] = save_val end end |
.instance ⇒ Object
A singleton instance that can be shared to communicate with a Cassandra cluster.
28 29 30 31 32 33 34 |
# File 'lib/cassie.rb', line 28 def instance unless defined?(@instance) && @instance instance = new(@config) @instance = instance end @instance end |
.logger ⇒ Object
Get a Logger compatible object if it has been set.
62 63 64 |
# File 'lib/cassie.rb', line 62 def logger @logger if defined?(@logger) end |
.logger=(value) ⇒ Object
Set a logger with a Logger compatible object.
67 68 69 |
# File 'lib/cassie.rb', line 67 def logger=(value) @logger = value end |
Instance Method Details
#batch(options = nil) ⇒ Object
Declare and execute a batch statement. Any insert, update, or delete calls made within the block will add themselves to the batch which is executed at the end of the block.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/cassie.rb', line 146 def batch( = nil) if Thread.current[:cassie_batch] yield else begin batch = [] Thread.current[:cassie_batch] = batch yield unless batch.empty? batch_statement = session.logged_batch batch.each do |cql, values| if values.blank? batch_statement.add(cql) else statement = prepare(cql) statement = statement.bind(Array(values)) if values.present? batch_statement.add(statement) end end execute(batch_statement) end ensure Thread.current[:cassie_batch] = nil end end end |
#connect ⇒ Object
Open a connection to the Cassandra cluster.
81 82 83 84 85 86 87 88 |
# File 'lib/cassie.rb', line 81 def connect cluster = Cassandra.cluster(config.cluster) logger.info("Cassie.connect with #{config.sanitized_cluster}") if logger @monitor.synchronize do @session = cluster.connect(config.default_keyspace) @prepared_statements = {} end end |
#connected? ⇒ Boolean
Return true if the connection to the Cassandra cluster has been established.
101 102 103 |
# File 'lib/cassie.rb', line 101 def connected? !!@session end |
#delete(table, key_hash, options = nil) ⇒ Object
Delete a row from a table. You should pass the primary key value in the key_hash.
If this method is called inside a batch block it will be executed in the batch.
240 241 242 243 244 |
# File 'lib/cassie.rb', line 240 def delete(table, key_hash, = nil) key_cql, key_values = key_clause(key_hash) cql = "DELETE FROM #{table} WHERE #{key_cql}" batch_or_execute(cql, key_values, ) end |
#disconnect ⇒ Object
Close the connections to the Cassandra cluster.
91 92 93 94 95 96 97 98 |
# File 'lib/cassie.rb', line 91 def disconnect logger.info("Cassie.disconnect from #{config.sanitized_cluster}") if logger @monitor.synchronize do @session.close if @session @session = nil @prepared_statements = {} end end |
#execute(cql, values = nil, options = nil) ⇒ Object
Execute an arbitrary CQL statment. If values are passed and the statement is a string, it will be prepared and executed as a prepared statement.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/cassie.rb', line 248 def execute(cql, values = nil, = nil) start_time = Time.now begin statement = nil if cql.is_a?(String) && values.present? statement = prepare(cql) else statement = cql end if values.present? values = Array(values) = ( ? .merge(:arguments => values) : {:arguments => values}) end # Set a default consistency from a block context if it isn't explicitly set. default_consistency = Thread.current[:cassie_consistency] if default_consistency = ( ? .reverse_merge(:consistency => default_consistency) : {:consistency => default_consistency}) end session.execute(statement, || {}) rescue Cassandra::Errors::IOError => e disconnect raise e ensure elapsed = Time.now - start_time if elapsed >= 0.5 && logger logger.warn("Slow CQL Query (#{(elapsed * 1000).round}ms): #{cql}#{' with ' + values.inspect unless values.blank?}") end end end |
#find(cql, values = nil, options = nil) ⇒ Object
Find rows using the CQL statement. If the statement is a string and values are provided then the statement will executed as a prepared statement. In general all statements should be executed this way.
If you have a statement without arguments, then you should call prepare before and pass the prepared statement if you plan on executing the same query multiple times.
180 181 182 |
# File 'lib/cassie.rb', line 180 def find(cql, values = nil, = nil) execute(cql, values, ) end |
#insert(table, values_hash, options = nil) ⇒ Object
Insert a row from a hash into a table.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/cassie.rb', line 189 def insert(table, values_hash, = nil) columns = [] values = [] values_hash.each do |column, value| if !value.nil? columns << column values << value end end cql = "INSERT INTO #{table} (#{columns.join(', ')}) VALUES (#{question_marks(columns.size)})" ttl = [:ttl] if if ttl cql << " USING TTL ?" values << ttl end batch_or_execute(cql, values, ) end |
#prepare(cql) ⇒ Object
Prepare a CQL statement for repeate execution. Prepared statements are cached on the driver until the connection is closed. Calling prepare multiple times with the same CQL string will return the prepared statement from a cache.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/cassie.rb', line 116 def prepare(cql) raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String) statement = @prepared_statements[cql] cache_filled_up = false unless statement @monitor.synchronize do statement = session.prepare(cql) @prepared_statements[cql] = statement if @prepared_statements.size > config.max_prepared_statements # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used, # but that would require additional overhead on each query. This method will eventually # keep the most active queries in the cache and is overall more efficient. @prepared_statements.delete(@prepared_statements.first[0]) cache_filled_up = true end end end if cache_filled_up && logger && Time.now > @last_prepare_warning + 10 # Set a throttle on how often this message is logged so we don't kill performance enven more. @last_prepare_warning = Time.now logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.") end statement end |
#reconnect ⇒ Object
Force reconnection. If you’re using this code in conjunction in a forking server environment like passenger or unicorn you should call this method after forking.
107 108 109 110 |
# File 'lib/cassie.rb', line 107 def reconnect disconnect connect end |
#update(table, values_hash, key_hash, options = nil) ⇒ Object
Update a row in a table. The values to update should be passed in the values_hash while the primary key should be passed in the key_hash.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/cassie.rb', line 215 def update(table, values_hash, key_hash, = nil) key_cql, key_values = key_clause(key_hash) update_cql = [] update_values = [] values_hash.each do |column, value| update_cql << "#{column} = ?" update_values << value end values = update_values + key_values cql = "UPDATE #{table}" ttl = [:ttl] if if ttl cql << " USING TTL ?" values.unshift(ttl) end cql << " SET #{update_cql.join(', ')} WHERE #{key_cql}" batch_or_execute(cql, values, ) end |