Class: Cassie

Inherits:
Object
  • Object
show all
Defined in:
lib/cassie.rb

Overview

This class provides a lightweight wrapper around the Cassandra driver. It provides a foundation for maintaining a connection and constructing CQL statements.

Defined Under Namespace

Modules: Model, Testing Classes: Config, Message, Railtie, RecordInvalid, RecordNotFound, Schema, Subscribers

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Cassie

Returns a new instance of Cassie.



86
87
88
89
90
91
92
93
# File 'lib/cassie.rb', line 86

def initialize(config)
  @config = config
  @monitor = Monitor.new
  @session = nil
  @prepared_statements = {}
  @last_prepare_warning = Time.now
  @subscribers = Subscribers.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



38
39
40
# File 'lib/cassie.rb', line 38

def config
  @config
end

#subscribersObject (readonly)

Returns the value of attribute subscribers.



38
39
40
# File 'lib/cassie.rb', line 38

def subscribers
  @subscribers
end

Class Method Details

.configure!(options) ⇒ Object

Call this method to load the Cassie::Config from the specified file for the specified environment.



52
53
54
55
56
57
58
59
# File 'lib/cassie.rb', line 52

def configure!(options)
  if defined?(@instance) && @instance
    old_instance = @instance
    @instance = nil
    old_instance.disconnect
  end
  @config = Cassie::Config.new(options)
end

.consistency(level) ⇒ Object

This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them. It can be used where consistency is important (i.e. on validation queries) but where a higher level method doesn’t provide an option to set it.



65
66
67
68
69
70
71
72
73
# File 'lib/cassie.rb', line 65

def consistency(level)
  save_val = Thread.current[:cassie_consistency]
  begin
    Thread.current[:cassie_consistency] = level
    yield
  ensure
    Thread.current[:cassie_consistency] = save_val
  end
end

.instanceObject

A singleton instance that can be shared to communicate with a Cassandra cluster.



42
43
44
45
46
47
48
# File 'lib/cassie.rb', line 42

def instance
  unless defined?(@instance) && @instance
    instance = new(@config)
    @instance = instance
  end
  @instance
end

.loggerObject

Get a Logger compatible object if it has been set.



76
77
78
# File 'lib/cassie.rb', line 76

def logger
  @logger if defined?(@logger)
end

.logger=(value) ⇒ Object

Set a logger with a Logger compatible object.



81
82
83
# File 'lib/cassie.rb', line 81

def logger=(value)
  @logger = value
end

Instance Method Details

#batch(options = nil) ⇒ Object

Declare and execute a batch statement. Any insert, update, or delete calls made within the block will add themselves to the batch which is executed at the end of the block.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/cassie.rb', line 164

def batch(options = nil)
  if Thread.current[:cassie_batch]
    yield
  else
    begin
      batch = []
      Thread.current[:cassie_batch] = batch
      yield
      unless batch.empty?
        batch_statement = session.logged_batch
        batch.each do |cql, values|
          if values.blank?
            batch_statement.add(cql)
          else
            statement = prepare(cql)
            statement = statement.bind(Array(values)) if values.present?
            batch_statement.add(statement)
          end
        end
        execute(batch_statement)
      end
    ensure
      Thread.current[:cassie_batch] = nil
    end
  end
end

#connectObject

Open a connection to the Cassandra cluster.



96
97
98
99
100
101
102
103
104
105
106
# File 'lib/cassie.rb', line 96

def connect
  start_time = Time.now
  cluster_config = config.cluster
  cluster_config = cluster_config.merge(:logger => logger) if logger
  cluster = Cassandra.cluster(cluster_config)
  logger.info("Cassie.connect with #{config.sanitized_cluster} in #{((Time.now - start_time) * 1000).round}ms") if logger
  @monitor.synchronize do
    @session = cluster.connect(config.default_keyspace)
    @prepared_statements = {}
  end
end

#connected?Boolean

Return true if the connection to the Cassandra cluster has been established.

Returns:

  • (Boolean)


119
120
121
# File 'lib/cassie.rb', line 119

def connected?
  !!@session
end

#delete(table, key_hash, options = nil) ⇒ Object

Delete a row from a table. You should pass the primary key value in the key_hash.

If this method is called inside a batch block it will be executed in the batch.



262
263
264
265
266
# File 'lib/cassie.rb', line 262

def delete(table, key_hash, options = nil)
  key_cql, key_values = key_clause(key_hash)
  cql = "DELETE FROM #{table} WHERE #{key_cql}"
  batch_or_execute(cql, key_values, options)
end

#disconnectObject

Close the connections to the Cassandra cluster.



109
110
111
112
113
114
115
116
# File 'lib/cassie.rb', line 109

def disconnect
  logger.info("Cassie.disconnect from #{config.sanitized_cluster}") if logger
  @monitor.synchronize do
    @session.close if @session
    @session = nil
    @prepared_statements = {}
  end
end

#execute(cql, values = nil, options = nil) ⇒ Object

Execute an arbitrary CQL statment. If values are passed and the statement is a string, it will be prepared and executed as a prepared statement.



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/cassie.rb', line 270

def execute(cql, values = nil, options = nil)
  start_time = Time.now
  begin
    statement = nil
    if cql.is_a?(String)
      if values.present?
        statement = prepare(cql)
      else
        statement = Cassandra::Statements::Simple.new(cql)
      end
    else
      statement = cql
    end
  
    if values.present?
      values = Array(values)
      options = (options ? options.merge(:arguments => values) : {:arguments => values})
    end
  
    # Set a default consistency from a block context if it isn't explicitly set.
    default_consistency = Thread.current[:cassie_consistency]
    if default_consistency
      options = (options ? options.reverse_merge(:consistency => default_consistency) : {:consistency => default_consistency})
    end
    
    session.execute(statement, options || {})
  rescue Cassandra::Errors::IOError => e
    disconnect
    raise e
  ensure
    if statement.is_a?(Cassandra::Statement) && !subscribers.empty?
      payload = Message.new(statement, options, Time.now - start_time)
      subscribers.each{|subscriber| subscriber.call(payload)}
    end
  end
end

#find(cql, values = nil, options = nil) ⇒ Object

Find rows using the CQL statement. If the statement is a string and values are provided then the statement will executed as a prepared statement. In general all statements should be executed this way.

If you have a statement without arguments, then you should call prepare before and pass the prepared statement if you plan on executing the same query multiple times.



198
199
200
# File 'lib/cassie.rb', line 198

def find(cql, values = nil, options = nil)
  execute(cql, values, options)
end

#insert(table, values_hash, options = nil) ⇒ Object

Insert a row from a hash into a table.

You can specify a ttl for the created row by supplying a :ttl option.

If this method is called inside a batch block it will be executed in the batch.



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/cassie.rb', line 207

def insert(table, values_hash, options = nil)
  columns = []
  values = []
  values_hash.each do |column, value|
    if !value.nil?
      columns << column
      values << value
    end
  end
  cql = "INSERT INTO #{table} (#{columns.join(', ')}) VALUES (#{question_marks(columns.size)})"
  
  ttl = options[:ttl] if options
  if ttl
    cql << " USING TTL ?"
    values << ttl
  end
  
  batch_or_execute(cql, values, options)
end

#prepare(cql) ⇒ Object

Prepare a CQL statement for repeate execution. Prepared statements are cached on the driver until the connection is closed. Calling prepare multiple times with the same CQL string will return the prepared statement from a cache.

Raises:

  • (ArgumentError)


134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/cassie.rb', line 134

def prepare(cql)
  raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String)
  statement = @prepared_statements[cql]
  cache_filled_up = false
  unless statement
    @monitor.synchronize do
      statement = session.prepare(cql)
      @prepared_statements[cql] = statement
      if @prepared_statements.size > config.max_prepared_statements
        # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used,
        # but that would require additional overhead on each query. This method will eventually
        # keep the most active queries in the cache and is overall more efficient.
        @prepared_statements.delete(@prepared_statements.first[0])
        cache_filled_up = true
      end
    end
  end
  
  if cache_filled_up && logger && Time.now > @last_prepare_warning + 10
    # Set a throttle on how often this message is logged so we don't kill performance enven more.
    @last_prepare_warning = Time.now
    logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.")
  end
  
  statement
end

#reconnectObject

Force reconnection. If you’re using this code in conjunction in a forking server environment like passenger or unicorn you should call this method after forking.



125
126
127
128
# File 'lib/cassie.rb', line 125

def reconnect
  disconnect
  connect
end

#update(table, values_hash, key_hash, options = nil) ⇒ Object

Update a row in a table. The values to update should be passed in the values_hash while the primary key should be passed in the key_hash.

You can specify a ttl for the created row by supplying a :ttl option.

If this method is called inside a batch block it will be executed in the batch.



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/cassie.rb', line 233

def update(table, values_hash, key_hash, options = nil)
  key_cql, key_values = key_clause(key_hash)
  update_cql = []
  update_values = []
  if values_hash.is_a?(String)
    update_cql << values_hash
  else
    values_hash.each do |column, value|
      update_cql << "#{column} = ?"
      update_values << value
    end
  end
  values = update_values + key_values
  
  cql = "UPDATE #{table}"
  ttl = options[:ttl] if options
  if ttl
    cql << " USING TTL ?"
    values.unshift(ttl)
  end
  cql << " SET #{update_cql.join(', ')} WHERE #{key_cql}"
  
  batch_or_execute(cql, values, options)
end