Class: Cassie
- Inherits:
-
Object
- Object
- Cassie
- Defined in:
- lib/cassie.rb
Overview
This class provides a lightweight wrapper around the Cassandra driver. It provides a foundation for maintaining a connection and constructing CQL statements.
Defined Under Namespace
Modules: Model, Testing Classes: Config, Message, Railtie, RecordInvalid, RecordNotFound, Schema, Subscribers
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#subscribers ⇒ Object
readonly
Returns the value of attribute subscribers.
Class Method Summary collapse
-
.configure!(options) ⇒ Object
Call this method to load the Cassie::Config from the specified file for the specified environment.
-
.consistency(level) ⇒ Object
This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them.
-
.instance ⇒ Object
A singleton instance that can be shared to communicate with a Cassandra cluster.
-
.logger ⇒ Object
Get a Logger compatible object if it has been set.
-
.logger=(value) ⇒ Object
Set a logger with a Logger compatible object.
Instance Method Summary collapse
-
#batch(options = nil) ⇒ Object
Declare and execute a batch statement.
-
#connect ⇒ Object
Open a connection to the Cassandra cluster.
-
#connected? ⇒ Boolean
Return true if the connection to the Cassandra cluster has been established.
-
#delete(table, key_hash, options = nil) ⇒ Object
Delete a row from a table.
-
#disconnect ⇒ Object
Close the connections to the Cassandra cluster.
-
#execute(cql, values = nil, options = nil) ⇒ Object
Execute an arbitrary CQL statment.
-
#find(cql, values = nil, options = nil) ⇒ Object
Find rows using the CQL statement.
-
#initialize(config) ⇒ Cassie
constructor
A new instance of Cassie.
-
#insert(table, values_hash, options = nil) ⇒ Object
Insert a row from a hash into a table.
-
#prepare(cql) ⇒ Object
Prepare a CQL statement for repeate execution.
-
#reconnect ⇒ Object
Force reconnection.
-
#update(table, values_hash, key_hash, options = nil) ⇒ Object
Update a row in a table.
Constructor Details
#initialize(config) ⇒ Cassie
Returns a new instance of Cassie.
86 87 88 89 90 91 92 93 |
# File 'lib/cassie.rb', line 86 def initialize(config) @config = config @monitor = Monitor.new @session = nil @prepared_statements = {} @last_prepare_warning = Time.now @subscribers = Subscribers.new end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
38 39 40 |
# File 'lib/cassie.rb', line 38 def config @config end |
#subscribers ⇒ Object (readonly)
Returns the value of attribute subscribers.
38 39 40 |
# File 'lib/cassie.rb', line 38 def subscribers @subscribers end |
Class Method Details
.configure!(options) ⇒ Object
Call this method to load the Cassie::Config from the specified file for the specified environment.
52 53 54 55 56 57 58 59 |
# File 'lib/cassie.rb', line 52 def configure!() if defined?(@instance) && @instance old_instance = @instance @instance = nil old_instance.disconnect end @config = Cassie::Config.new() end |
.consistency(level) ⇒ Object
This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them. It can be used where consistency is important (i.e. on validation queries) but where a higher level method doesn’t provide an option to set it.
65 66 67 68 69 70 71 72 73 |
# File 'lib/cassie.rb', line 65 def consistency(level) save_val = Thread.current[:cassie_consistency] begin Thread.current[:cassie_consistency] = level yield ensure Thread.current[:cassie_consistency] = save_val end end |
.instance ⇒ Object
A singleton instance that can be shared to communicate with a Cassandra cluster.
42 43 44 45 46 47 48 |
# File 'lib/cassie.rb', line 42 def instance unless defined?(@instance) && @instance instance = new(@config) @instance = instance end @instance end |
.logger ⇒ Object
Get a Logger compatible object if it has been set.
76 77 78 |
# File 'lib/cassie.rb', line 76 def logger @logger if defined?(@logger) end |
.logger=(value) ⇒ Object
Set a logger with a Logger compatible object.
81 82 83 |
# File 'lib/cassie.rb', line 81 def logger=(value) @logger = value end |
Instance Method Details
#batch(options = nil) ⇒ Object
Declare and execute a batch statement. Any insert, update, or delete calls made within the block will add themselves to the batch which is executed at the end of the block.
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/cassie.rb', line 164 def batch( = nil) if Thread.current[:cassie_batch] yield else begin batch = [] Thread.current[:cassie_batch] = batch yield unless batch.empty? batch_statement = session.logged_batch batch.each do |cql, values| if values.blank? batch_statement.add(cql) else statement = prepare(cql) statement = statement.bind(Array(values)) if values.present? batch_statement.add(statement) end end execute(batch_statement) end ensure Thread.current[:cassie_batch] = nil end end end |
#connect ⇒ Object
Open a connection to the Cassandra cluster.
96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/cassie.rb', line 96 def connect start_time = Time.now cluster_config = config.cluster cluster_config = cluster_config.merge(:logger => logger) if logger cluster = Cassandra.cluster(cluster_config) logger.info("Cassie.connect with #{config.sanitized_cluster} in #{((Time.now - start_time) * 1000).round}ms") if logger @monitor.synchronize do @session = cluster.connect(config.default_keyspace) @prepared_statements = {} end end |
#connected? ⇒ Boolean
Return true if the connection to the Cassandra cluster has been established.
119 120 121 |
# File 'lib/cassie.rb', line 119 def connected? !!@session end |
#delete(table, key_hash, options = nil) ⇒ Object
Delete a row from a table. You should pass the primary key value in the key_hash.
If this method is called inside a batch block it will be executed in the batch.
262 263 264 265 266 |
# File 'lib/cassie.rb', line 262 def delete(table, key_hash, = nil) key_cql, key_values = key_clause(key_hash) cql = "DELETE FROM #{table} WHERE #{key_cql}" batch_or_execute(cql, key_values, ) end |
#disconnect ⇒ Object
Close the connections to the Cassandra cluster.
109 110 111 112 113 114 115 116 |
# File 'lib/cassie.rb', line 109 def disconnect logger.info("Cassie.disconnect from #{config.sanitized_cluster}") if logger @monitor.synchronize do @session.close if @session @session = nil @prepared_statements = {} end end |
#execute(cql, values = nil, options = nil) ⇒ Object
Execute an arbitrary CQL statment. If values are passed and the statement is a string, it will be prepared and executed as a prepared statement.
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/cassie.rb', line 270 def execute(cql, values = nil, = nil) start_time = Time.now begin statement = nil if cql.is_a?(String) if values.present? statement = prepare(cql) else statement = Cassandra::Statements::Simple.new(cql) end else statement = cql end if values.present? values = Array(values) = ( ? .merge(:arguments => values) : {:arguments => values}) end # Set a default consistency from a block context if it isn't explicitly set. default_consistency = Thread.current[:cassie_consistency] if default_consistency = ( ? .reverse_merge(:consistency => default_consistency) : {:consistency => default_consistency}) end session.execute(statement, || {}) rescue Cassandra::Errors::IOError => e disconnect raise e ensure if statement.is_a?(Cassandra::Statement) && !subscribers.empty? payload = Message.new(statement, , Time.now - start_time) subscribers.each{|subscriber| subscriber.call(payload)} end end end |
#find(cql, values = nil, options = nil) ⇒ Object
Find rows using the CQL statement. If the statement is a string and values are provided then the statement will executed as a prepared statement. In general all statements should be executed this way.
If you have a statement without arguments, then you should call prepare before and pass the prepared statement if you plan on executing the same query multiple times.
198 199 200 |
# File 'lib/cassie.rb', line 198 def find(cql, values = nil, = nil) execute(cql, values, ) end |
#insert(table, values_hash, options = nil) ⇒ Object
Insert a row from a hash into a table.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/cassie.rb', line 207 def insert(table, values_hash, = nil) columns = [] values = [] values_hash.each do |column, value| if !value.nil? columns << column values << value end end cql = "INSERT INTO #{table} (#{columns.join(', ')}) VALUES (#{question_marks(columns.size)})" ttl = [:ttl] if if ttl cql << " USING TTL ?" values << ttl end batch_or_execute(cql, values, ) end |
#prepare(cql) ⇒ Object
Prepare a CQL statement for repeate execution. Prepared statements are cached on the driver until the connection is closed. Calling prepare multiple times with the same CQL string will return the prepared statement from a cache.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/cassie.rb', line 134 def prepare(cql) raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String) statement = @prepared_statements[cql] cache_filled_up = false unless statement @monitor.synchronize do statement = session.prepare(cql) @prepared_statements[cql] = statement if @prepared_statements.size > config.max_prepared_statements # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used, # but that would require additional overhead on each query. This method will eventually # keep the most active queries in the cache and is overall more efficient. @prepared_statements.delete(@prepared_statements.first[0]) cache_filled_up = true end end end if cache_filled_up && logger && Time.now > @last_prepare_warning + 10 # Set a throttle on how often this message is logged so we don't kill performance enven more. @last_prepare_warning = Time.now logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.") end statement end |
#reconnect ⇒ Object
Force reconnection. If you’re using this code in conjunction in a forking server environment like passenger or unicorn you should call this method after forking.
125 126 127 128 |
# File 'lib/cassie.rb', line 125 def reconnect disconnect connect end |
#update(table, values_hash, key_hash, options = nil) ⇒ Object
Update a row in a table. The values to update should be passed in the values_hash while the primary key should be passed in the key_hash.
You can specify a ttl for the created row by supplying a :ttl option.
If this method is called inside a batch block it will be executed in the batch.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/cassie.rb', line 233 def update(table, values_hash, key_hash, = nil) key_cql, key_values = key_clause(key_hash) update_cql = [] update_values = [] if values_hash.is_a?(String) update_cql << values_hash else values_hash.each do |column, value| update_cql << "#{column} = ?" update_values << value end end values = update_values + key_values cql = "UPDATE #{table}" ttl = [:ttl] if if ttl cql << " USING TTL ?" values.unshift(ttl) end cql << " SET #{update_cql.join(', ')} WHERE #{key_cql}" batch_or_execute(cql, values, ) end |