Class: Cassie
- Inherits:
-
Object
- Object
- Cassie
- Defined in:
- lib/cassie.rb
Overview
This class provides a lightweight wrapper around the Cassandra driver. It provides a foundation for maintaining a connection and constructing CQL statements.
Defined Under Namespace
Modules: Model, Testing Classes: Config, Message, Railtie, RecordInvalid, RecordNotFound, Schema, Subscribers
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#consistency ⇒ Object
Returns the value of attribute consistency.
-
#subscribers ⇒ Object
readonly
Returns the value of attribute subscribers.
Class Method Summary collapse
-
.configure!(options) ⇒ Object
Call this method to load the Cassie::Config from the specified file for the specified environment.
-
.consistency(level) ⇒ Object
This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them.
-
.instance ⇒ Object
A singleton instance that can be shared to communicate with a Cassandra cluster.
-
.logger ⇒ Object
Get a Logger compatible object if it has been set.
-
.logger=(value) ⇒ Object
Set a logger with a Logger compatible object.
Instance Method Summary collapse
-
#batch(options = nil) ⇒ Object
Declare and execute a batch statement.
-
#connect ⇒ Object
Open a connection to the Cassandra cluster.
-
#connected? ⇒ Boolean
Return true if the connection to the Cassandra cluster has been established.
-
#current_consistency ⇒ Object
Return the current consistency level that has been set for statements.
-
#delete(table, key_hash, options = nil) ⇒ Object
Delete a row from a table.
-
#disconnect ⇒ Object
Close the connections to the Cassandra cluster.
-
#execute(cql, values = nil, options = nil) ⇒ Object
Execute an arbitrary CQL statment.
-
#find(cql, values = nil, options = nil) ⇒ Object
Find rows using the CQL statement.
-
#initialize(config) ⇒ Cassie
constructor
A new instance of Cassie.
-
#insert(table, values_hash, options = nil) ⇒ Object
Insert a row from a hash into a table.
-
#prepare(cql) ⇒ Object
Prepare a CQL statement for repeate execution.
-
#reconnect ⇒ Object
Force reconnection.
-
#update(table, values_hash, key_hash, options = nil) ⇒ Object
Update a row in a table.
Constructor Details
#initialize(config) ⇒ Cassie
Returns a new instance of Cassie.
87 88 89 90 91 92 93 94 95 |
# File 'lib/cassie.rb', line 87 def initialize(config) @config = config @monitor = Monitor.new @session = nil @prepared_statements = {} @last_prepare_warning = Time.now @subscribers = Subscribers.new @consistency = ((config.cluster || {})[:consistency] || :local_one) end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
38 39 40 |
# File 'lib/cassie.rb', line 38 def config @config end |
#consistency ⇒ Object
Returns the value of attribute consistency.
39 40 41 |
# File 'lib/cassie.rb', line 39 def consistency @consistency end |
#subscribers ⇒ Object (readonly)
Returns the value of attribute subscribers.
38 39 40 |
# File 'lib/cassie.rb', line 38 def subscribers @subscribers end |
Class Method Details
.configure!(options) ⇒ Object
Call this method to load the Cassie::Config from the specified file for the specified environment.
53 54 55 56 57 58 59 60 |
# File 'lib/cassie.rb', line 53 def configure!() if defined?(@instance) && @instance old_instance = @instance @instance = nil old_instance.disconnect end @config = Cassie::Config.new() end |
.consistency(level) ⇒ Object
This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them. It can be used where consistency is important (i.e. on validation queries) but where a higher level method doesn’t provide an option to set it.
66 67 68 69 70 71 72 73 74 |
# File 'lib/cassie.rb', line 66 def consistency(level) save_val = Thread.current[:cassie_consistency] begin Thread.current[:cassie_consistency] = level yield ensure Thread.current[:cassie_consistency] = save_val end end |
.instance ⇒ Object
A singleton instance that can be shared to communicate with a Cassandra cluster.
43 44 45 46 47 48 49 |
# File 'lib/cassie.rb', line 43 def instance unless defined?(@instance) && @instance instance = new(@config) @instance = instance end @instance end |
.logger ⇒ Object
Get a Logger compatible object if it has been set.
77 78 79 |
# File 'lib/cassie.rb', line 77 def logger @logger if defined?(@logger) end |
.logger=(value) ⇒ Object
Set a logger with a Logger compatible object.
82 83 84 |
# File 'lib/cassie.rb', line 82 def logger=(value) @logger = value end |
Instance Method Details
#batch(options = nil) ⇒ Object
Declare and execute a batch statement. Any insert, update, or delete calls made within the block will add themselves to the batch which is executed at the end of the block.
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/cassie.rb', line 166 def batch( = nil) if Thread.current[:cassie_batch] yield else begin batch = [] Thread.current[:cassie_batch] = batch yield unless batch.empty? batch_statement = session.logged_batch batch.each do |cql, values| if values.blank? batch_statement.add(cql) else statement = prepare(cql) statement = statement.bind(Array(values)) if values.present? batch_statement.add(statement) end end execute(batch_statement, nil, ) end ensure Thread.current[:cassie_batch] = nil end end end |
#connect ⇒ Object
Open a connection to the Cassandra cluster.
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/cassie.rb', line 98 def connect start_time = Time.now cluster_config = config.cluster cluster_config = cluster_config.merge(:logger => logger) if logger cluster = Cassandra.cluster(cluster_config) logger.info("Cassie.connect with #{config.sanitized_cluster} in #{((Time.now - start_time) * 1000).round}ms") if logger @monitor.synchronize do @session = cluster.connect(config.default_keyspace) @prepared_statements = {} end end |
#connected? ⇒ Boolean
Return true if the connection to the Cassandra cluster has been established.
121 122 123 |
# File 'lib/cassie.rb', line 121 def connected? !!@session end |
#current_consistency ⇒ Object
Return the current consistency level that has been set for statements.
322 323 324 |
# File 'lib/cassie.rb', line 322 def current_consistency Thread.current[:cassie_consistency] || consistency end |
#delete(table, key_hash, options = nil) ⇒ Object
Delete a row from a table. You should pass the primary key value in the key_hash.
If this method is called inside a batch block it will be executed in the batch.
272 273 274 275 276 |
# File 'lib/cassie.rb', line 272 def delete(table, key_hash, = nil) key_cql, key_values = key_clause(key_hash) cql = "DELETE FROM #{table} WHERE #{key_cql}" batch_or_execute(cql, key_values, ) end |
#disconnect ⇒ Object
Close the connections to the Cassandra cluster.
111 112 113 114 115 116 117 118 |
# File 'lib/cassie.rb', line 111 def disconnect logger.info("Cassie.disconnect from #{config.sanitized_cluster}") if logger @monitor.synchronize do @session.close if @session @session = nil @prepared_statements = {} end end |
#execute(cql, values = nil, options = nil) ⇒ Object
Execute an arbitrary CQL statment. If values are passed and the statement is a string, it will be prepared and executed as a prepared statement.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/cassie.rb', line 280 def execute(cql, values = nil, = nil) start_time = Time.now begin statement = nil if cql.is_a?(String) if values.present? statement = prepare(cql) else statement = Cassandra::Statements::Simple.new(cql) end else statement = cql end if values.present? values = Array(values) = ( ? .merge(:arguments => values) : {:arguments => values}) end # Set a default consistency from a block context if it isn't explicitly set. statement_consistency = current_consistency if statement_consistency if = .merge(:consistency => statement_consistency) if [:consistency].nil? else = {:consistency => statement_consistency} end end session.execute(statement, || {}) rescue Cassandra::Errors::IOError => e disconnect raise e ensure if statement.is_a?(Cassandra::Statement) && !subscribers.empty? payload = Message.new(statement, , Time.now - start_time) subscribers.each{|subscriber| subscriber.call(payload)} end end end |
#find(cql, values = nil, options = nil) ⇒ Object
Find rows using the CQL statement. If the statement is a string and values are provided then the statement will executed as a prepared statement. In general all statements should be executed this way.
If you have a statement without arguments, then you should call prepare before and pass the prepared statement if you plan on executing the same query multiple times.
200 201 202 |
# File 'lib/cassie.rb', line 200 def find(cql, values = nil, = nil) execute(cql, values, ) end |
#insert(table, values_hash, options = nil) ⇒ Object
Insert a row from a hash into a table.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/cassie.rb', line 209 def insert(table, values_hash, = nil) columns = [] values = [] values_hash.each do |column, value| if !value.nil? columns << column values << value end end cql = "INSERT INTO #{table} (#{columns.join(', ')}) VALUES (#{question_marks(columns.size)})" if && .include?(:ttl) = .dup ttl = .delete(:ttl) if ttl cql << " USING TTL ?" values << Integer(ttl) end end batch_or_execute(cql, values, ) end |
#prepare(cql) ⇒ Object
Prepare a CQL statement for repeate execution. Prepared statements are cached on the driver until the connection is closed. Calling prepare multiple times with the same CQL string will return the prepared statement from a cache.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/cassie.rb', line 136 def prepare(cql) raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String) statement = @prepared_statements[cql] cache_filled_up = false unless statement @monitor.synchronize do statement = session.prepare(cql) @prepared_statements[cql] = statement if @prepared_statements.size > config.max_prepared_statements # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used, # but that would require additional overhead on each query. This method will eventually # keep the most active queries in the cache and is overall more efficient. @prepared_statements.delete(@prepared_statements.first[0]) cache_filled_up = true end end end if cache_filled_up && logger && Time.now > @last_prepare_warning + 10 # Set a throttle on how often this message is logged so we don't kill performance enven more. @last_prepare_warning = Time.now logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.") end statement end |
#reconnect ⇒ Object
Force reconnection. If you’re using this code in conjunction in a forking server environment like passenger or unicorn you should call this method after forking.
127 128 129 130 |
# File 'lib/cassie.rb', line 127 def reconnect disconnect connect end |
#update(table, values_hash, key_hash, options = nil) ⇒ Object
Update a row in a table. The values to update should be passed in the values_hash while the primary key should be passed in the key_hash.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/cassie.rb', line 238 def update(table, values_hash, key_hash, = nil) key_cql, key_values = key_clause(key_hash) update_cql = [] update_values = [] if values_hash.is_a?(String) update_cql << values_hash else values_hash.each do |column, value| update_cql << "#{column} = ?" update_values << value end end values = update_values + key_values cql = "UPDATE #{table}" if && .include?(:ttl) = .dup ttl = .delete(:ttl) if ttl cql << " USING TTL ?" values.unshift(Integer(ttl)) end end cql << " SET #{update_cql.join(', ')} WHERE #{key_cql}" batch_or_execute(cql, values, ) end |