Class: Cassie

Inherits:
Object
  • Object
show all
Defined in:
lib/cassie.rb

Overview

This class provides a lightweight wrapper around the Cassandra driver. It provides a foundation for maintaining a connection and constructing CQL statements.

Defined Under Namespace

Modules: Model, Testing Classes: Config, Railtie, RecordInvalid, RecordNotFound, Schema

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Cassie

Returns a new instance of Cassie.



72
73
74
75
76
77
78
# File 'lib/cassie.rb', line 72

def initialize(config)
  @config = config
  @monitor = Monitor.new
  @session = nil
  @prepared_statements = {}
  @last_prepare_warning = Time.now
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



24
25
26
# File 'lib/cassie.rb', line 24

def config
  @config
end

Class Method Details

.configure!(options) ⇒ Object

Call this method to load the Cassie::Config from the specified file for the specified environment.



38
39
40
41
42
43
44
45
# File 'lib/cassie.rb', line 38

def configure!(options)
  if defined?(@instance) && @instance
    old_instance = @instance
    @instance = nil
    old_instance.disconnect
  end
  @config = Cassie::Config.new(options)
end

.consistency(level) ⇒ Object

This method can be used to set a consistency level for all Cassandra queries within a block that don’t explicitly define them. It can be used where consistency is important (i.e. on validation queries) but where a higher level method doesn’t provide an option to set it.



51
52
53
54
55
56
57
58
59
# File 'lib/cassie.rb', line 51

def consistency(level)
  save_val = Thread.current[:cassie_consistency]
  begin
    Thread.current[:cassie_consistency] = level
    yield
  ensure
    Thread.current[:cassie_consistency] = save_val
  end
end

.instanceObject

A singleton instance that can be shared to communicate with a Cassandra cluster.



28
29
30
31
32
33
34
# File 'lib/cassie.rb', line 28

def instance
  unless defined?(@instance) && @instance
    instance = new(@config)
    @instance = instance
  end
  @instance
end

.loggerObject

Get a Logger compatible object if it has been set.



62
63
64
# File 'lib/cassie.rb', line 62

def logger
  @logger if defined?(@logger)
end

.logger=(value) ⇒ Object

Set a logger with a Logger compatible object.



67
68
69
# File 'lib/cassie.rb', line 67

def logger=(value)
  @logger = value
end

Instance Method Details

#batch(options = nil) ⇒ Object

Declare and execute a batch statement. Any insert, update, or delete calls made within the block will add themselves to the batch which is executed at the end of the block.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/cassie.rb', line 146

def batch(options = nil)
  if Thread.current[:cassie_batch]
    yield
  else
    begin
      batch = []
      Thread.current[:cassie_batch] = batch
      yield
      unless batch.empty?
        batch_statement = session.logged_batch
        batch.each do |cql, values|
          if values.blank?
            batch_statement.add(cql)
          else
            statement = prepare(cql)
            statement = statement.bind(Array(values)) if values.present?
            batch_statement.add(statement)
          end
        end
        execute(batch_statement)
      end
    ensure
      Thread.current[:cassie_batch] = nil
    end
  end
end

#connectObject

Open a connection to the Cassandra cluster.



81
82
83
84
85
86
87
88
# File 'lib/cassie.rb', line 81

def connect
  cluster = Cassandra.cluster(config.cluster)
  logger.info("Cassie.connect with #{config.sanitized_cluster}") if logger
  @monitor.synchronize do
    @session = cluster.connect(config.default_keyspace)
    @prepared_statements = {}
  end
end

#connected?Boolean

Return true if the connection to the Cassandra cluster has been established.

Returns:

  • (Boolean)


101
102
103
# File 'lib/cassie.rb', line 101

def connected?
  !!@session
end

#delete(table, key_hash, options = nil) ⇒ Object

Delete a row from a table. You should pass the primary key value in the key_hash.

If this method is called inside a batch block it will be executed in the batch.



240
241
242
243
244
# File 'lib/cassie.rb', line 240

def delete(table, key_hash, options = nil)
  key_cql, key_values = key_clause(key_hash)
  cql = "DELETE FROM #{table} WHERE #{key_cql}"
  batch_or_execute(cql, key_values, options)
end

#disconnectObject

Close the connections to the Cassandra cluster.



91
92
93
94
95
96
97
98
# File 'lib/cassie.rb', line 91

def disconnect
  logger.info("Cassie.disconnect from #{config.sanitized_cluster}") if logger
  @monitor.synchronize do
    @session.close if @session
    @session = nil
    @prepared_statements = {}
  end
end

#execute(cql, values = nil, options = nil) ⇒ Object

Execute an arbitrary CQL statment. If values are passed and the statement is a string, it will be prepared and executed as a prepared statement.



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/cassie.rb', line 248

def execute(cql, values = nil, options = nil)
  start_time = Time.now
  begin
    statement = nil
    if cql.is_a?(String) && values.present?
      statement = prepare(cql)
    else
      statement = cql
    end
  
    if values.present?
      values = Array(values)
      options = (options ? options.merge(:arguments => values) : {:arguments => values})
    end
  
    # Set a default consistency from a block context if it isn't explicitly set.
    default_consistency = Thread.current[:cassie_consistency]
    if default_consistency
      options = (options ? options.reverse_merge(:consistency => default_consistency) : {:consistency => default_consistency})
    end
  
    session.execute(statement, options || {})
  rescue Cassandra::Errors::IOError => e
    disconnect
    raise e
  ensure
    elapsed = Time.now - start_time
    if elapsed >= 0.5 && logger
      logger.warn("Slow CQL Query (#{(elapsed * 1000).round}ms): #{cql}#{' with ' + values.inspect unless values.blank?}")
    end
  end
end

#find(cql, values = nil, options = nil) ⇒ Object

Find rows using the CQL statement. If the statement is a string and values are provided then the statement will executed as a prepared statement. In general all statements should be executed this way.

If you have a statement without arguments, then you should call prepare before and pass the prepared statement if you plan on executing the same query multiple times.



180
181
182
# File 'lib/cassie.rb', line 180

def find(cql, values = nil, options = nil)
  execute(cql, values, options)
end

#insert(table, values_hash, options = nil) ⇒ Object

Insert a row from a hash into a table.

You can specify a ttl for the created row by supplying a :ttl option.

If this method is called inside a batch block it will be executed in the batch.



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/cassie.rb', line 189

def insert(table, values_hash, options = nil)
  columns = []
  values = []
  values_hash.each do |column, value|
    if !value.nil?
      columns << column
      values << value
    end
  end
  cql = "INSERT INTO #{table} (#{columns.join(', ')}) VALUES (#{question_marks(columns.size)})"
  
  ttl = options[:ttl] if options
  if ttl
    cql << " USING TTL ?"
    values << ttl
  end
  
  batch_or_execute(cql, values, options)
end

#prepare(cql) ⇒ Object

Prepare a CQL statement for repeate execution. Prepared statements are cached on the driver until the connection is closed. Calling prepare multiple times with the same CQL string will return the prepared statement from a cache.

Raises:

  • (ArgumentError)


116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/cassie.rb', line 116

def prepare(cql)
  raise ArgumentError.new("CQL must be a string") unless cql.is_a?(String)
  statement = @prepared_statements[cql]
  cache_filled_up = false
  unless statement
    @monitor.synchronize do
      statement = session.prepare(cql)
      @prepared_statements[cql] = statement
      if @prepared_statements.size > config.max_prepared_statements
        # Cache is full. Clear out the oldest values. Ideally we'd remove the least recently used,
        # but that would require additional overhead on each query. This method will eventually
        # keep the most active queries in the cache and is overall more efficient.
        @prepared_statements.delete(@prepared_statements.first[0])
        cache_filled_up = true
      end
    end
  end
  
  if cache_filled_up && logger && Time.now > @last_prepare_warning + 10
    # Set a throttle on how often this message is logged so we don't kill performance enven more.
    @last_prepare_warning = Time.now
    logger.warn("Cassie.prepare cache filled up. Consider increasing the size from #{config.max_prepared_statements}.")
  end
  
  statement
end

#reconnectObject

Force reconnection. If you’re using this code in conjunction in a forking server environment like passenger or unicorn you should call this method after forking.



107
108
109
110
# File 'lib/cassie.rb', line 107

def reconnect
  disconnect
  connect
end

#update(table, values_hash, key_hash, options = nil) ⇒ Object

Update a row in a table. The values to update should be passed in the values_hash while the primary key should be passed in the key_hash.

You can specify a ttl for the created row by supplying a :ttl option.

If this method is called inside a batch block it will be executed in the batch.



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/cassie.rb', line 215

def update(table, values_hash, key_hash, options = nil)
  key_cql, key_values = key_clause(key_hash)
  update_cql = []
  update_values = []
  values_hash.each do |column, value|
    update_cql << "#{column} = ?"
    update_values << value
  end
  values = update_values + key_values
  
  cql = "UPDATE #{table}"
  ttl = options[:ttl] if options
  if ttl
    cql << " USING TTL ?"
    values.unshift(ttl)
  end
  cql << " SET #{update_cql.join(', ')} WHERE #{key_cql}"
  
  batch_or_execute(cql, values, options)
end