Class: Moneta::Adapters::Cassandra

Inherits:
Object
  • Object
show all
Includes:
Defaults, ExpiresSupport
Defined in:
lib/moneta/adapters/cassandra.rb

Overview

Cassandra backend

Instance Attribute Summary collapse

Attributes included from ExpiresSupport

#default_expires

Instance Method Summary collapse

Methods included from ExpiresSupport

included

Methods included from Defaults

#[], #[]=, #create, #decrement, #features, #fetch, included, #increment, #supports?, #update

Methods included from OptionSupport

#expires, #prefix, #raw, #with

Constructor Details

#initialize(options = {}) ⇒ Cassandra

Returns a new instance of Cassandra.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :keyspace (String) — default: 'moneta'

    Cassandra keyspace

  • :table (String) — default: 'moneta'

    Cassandra table

  • :host (String) — default: '127.0.0.1'

    Server host name

  • :port (Integer) — default: 9160

    Server port

  • :expires (Integer)

    Default expiration time

  • :key_column (String) — default: 'key'

    Name of the key column

  • :value_column (String) — default: 'value'

    Name of the value column

  • :updated_column (String) — default: 'updated_at'

    Name of the column used to track last update

  • :expired_column (String) — default: 'expired'

    Name of the column used to track expiry

  • :read_consistency (Symbol) — default: :all

    Default read consistency

  • :write_consistency (Symbol) — default: :all

    Default write consistency

  • :create_keyspace (Proc, Boolean, Hash)

    Provide a proc for creating the keyspace, or a Hash of options to use when creating it, or set to false to disable. The Proc will only be called if the keyspace does not already exist.

  • :cluster (::Cassandra::Cluster)

    Existing cluster to use

  • :backend (::Cassandra::Session)

    Existing session to use

  • Other (Object)

    options passed to ‘Cassandra#new`



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/moneta/adapters/cassandra.rb', line 39

def initialize(options = {})
  self.default_expires = options.delete(:expires)
  keyspace = options.delete(:keyspace) || 'moneta'

  @table = (options.delete(:column_family) || 'moneta').to_sym
  @key_column = options.delete(:key_column) || 'key'
  @value_column = options.delete(:value_column) || 'value'
  @updated_column = options.delete(:updated_column) || 'updated_at'
  @expired_column = options.delete(:expired_column) || 'expired'
  @read_consistency = options.delete(:read_consistency) || :all
  @write_consistency = options.delete(:write_consistency) || :all
  @create_keyspace = options.delete(:create_keyspace)

  unless @backend = options.delete(:backend)
    cluster = options.delete(:cluster) || (@own_cluster = ::Cassandra.cluster(options))
    begin
      @backend = cluster.connect(keyspace)
    rescue ::Cassandra::Errors::InvalidError
      @backend = cluster.connect
      create_keyspace(keyspace)
      @backend.execute("USE " + keyspace)
      @backend
    end
  end

  @backend.execute <<-CQL
    CREATE TABLE IF NOT EXISTS #{@table} (
      #{@key_column} blob,
      #{@value_column} blob,
      #{@updated_column} timeuuid,
      #{@expired_column} boolean,
      PRIMARY KEY (#{@key_column}, #{@updated_column})
    )
  CQL

  prepare_statements
end

Instance Attribute Details

#backendObject (readonly)



11
12
13
# File 'lib/moneta/adapters/cassandra.rb', line 11

def backend
  @backend
end

Instance Method Details

#clear(options = {}) ⇒ void

This method returns an undefined value.

Clear all keys in this store

Parameters:

  • options (Hash) (defaults to: {})


143
144
145
146
# File 'lib/moneta/adapters/cassandra.rb', line 143

def clear(options = {})
  @backend.execute(@clear)
  self
end

#closeObject

Explicitly close the store

Returns:

  • nil



149
150
151
152
153
154
155
156
157
# File 'lib/moneta/adapters/cassandra.rb', line 149

def close
  @backend.close_async
  @backend = nil
  if @own_cluster
    @own_cluster.close_async
    @own_cluster = nil
  end
  nil
end

#delete(key, options = {}) ⇒ Object

Delete the key from the store and return the current value

Parameters:

  • key (Object)
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :raw (Boolean)

    Raw access without value transformation (See Transformer)

  • :prefix (String)

    Prefix key (See Transformer)

  • Other (Object)

    options as defined by the adapters or middleware

Returns:

  • (Object)

    current value



133
134
135
136
137
138
139
140
# File 'lib/moneta/adapters/cassandra.rb', line 133

def delete(key, options = {})
  rc, wc = consistency(options)
  result = @backend.execute(@delete_value, options.merge(consistency: rc, arguments: [key]))
  if row = result.first and row[@expired_column] != nil
    @backend.execute(@delete, options.merge(consistency: wc, arguments: [timestamp, key, row[@updated_column]]))
    row[@value_column]
  end
end

#each_keyEnumerator #each_key {|key| ... } ⇒ self

Note:

Not every Moneta store implements this method, a NotImplementedError is raised if it is not supported.

Calls block once for each key in store, passing the key as a parameter. If no block is given, an enumerator is returned instead.

Overloads:

  • #each_keyEnumerator

    Returns An all-the-keys enumerator.

    Returns:

    • (Enumerator)

      An all-the-keys enumerator

  • #each_key {|key| ... } ⇒ self

    Yield Parameters:

    • key (Object)

      Each key is yielded to the supplied block

    Returns:

    • (self)


160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/moneta/adapters/cassandra.rb', line 160

def each_key
  rc, = consistency
  return enum_for(:each_key) unless block_given?
  result = @backend.execute(@each_key, consistency: rc, page_size: 100)
  loop do
    result.each do |row|
      next if row[@expired_column] == nil
      yield row[@key_column]
    end

    break if result.last_page?
    result = result.next_page
  end
  self
end

#fetch_values(*keys, **options) ⇒ Object #fetch_values(*keys, **options) {|key| ... } ⇒ Array<Object, nil>

Note:

Some adapters may implement this method atomically. The default implmentation uses #values_at.

Behaves identically to #values_at except that it accepts an optional block. When supplied, the block will be called successively with each supplied key that is not present in the store. The return value of the block call will be used in place of nil in returned the array of values.

Overloads:

  • #fetch_values(*keys, **options) {|key| ... } ⇒ Array<Object, nil>

    Returns Array containing the values requested, or where keys are missing, the return values from the corresponding block calls.

    Yield Parameters:

    • key (Object)

      Each key that is not found in the store

    Yield Returns:

    • (Object, nil)

      The value to substitute for the missing one

    Returns:

    • (Array<Object, nil>)

      Array containing the values requested, or where keys are missing, the return values from the corresponding block calls



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/moneta/adapters/cassandra.rb', line 212

def fetch_values(*keys, **options)
  return values_at(*keys, **options) unless block_given?
  hash = Hash[slice(*keys, **options)]
  keys.map do |key|
    if hash.key?(key)
      hash[key]
    else
      yield key
    end
  end
end

#key?(key, options = {}) ⇒ Boolean

Exists the value with key

Parameters:

  • key (Object)
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :expires (Integer)

    Update expiration time (See Expires)

  • :prefix (String)

    Prefix key (See Transformer)

  • Other (Object)

    options as defined by the adapters or middleware

Returns:

  • (Boolean)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/moneta/adapters/cassandra.rb', line 78

def key?(key, options = {})
  rc, wc = consistency(options)
  if (expires = expires_value(options, nil)) != nil
    # Because Cassandra expires each value in a column, rather than the
    # whole column, when we want to update the expiry we load the value
    # and then re-set it in order to update the TTL.
    return false unless
      row = @backend.execute(@load, options.merge(consistency: rc, arguments: [key])).first and
        row[@expired_column] != nil
    @backend.execute(@update_expires,
                     options.merge(consistency: wc,
                                   arguments: [(expires || 0).to_i,
                                               timestamp,
                                               row[@value_column],
                                               key,
                                               row[@updated_column]]))
    true
  elsif row = @backend.execute(@key, options.merge(consistency: rc, arguments: [key])).first
    row[@expired_column] != nil
  else
    false
  end
end

#load(key, options = {}) ⇒ Object

Fetch value with key. Return nil if the key doesn’t exist

Parameters:

  • key (Object)
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :expires (Integer)

    Update expiration time (See Expires)

  • :raw (Boolean)

    Raw access without value transformation (See Transformer)

  • :prefix (String)

    Prefix key (See Transformer)

  • :sync (Boolean)

    Synchronized load (Cache reloads from adapter, Daybreak syncs with file)

  • Other (Object)

    options as defined by the adapters or middleware

Returns:

  • (Object)

    value



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/moneta/adapters/cassandra.rb', line 103

def load(key, options = {})
  rc, wc = consistency(options)
  if row = @backend.execute(@load, options.merge(consistency: rc, arguments: [key])).first and row[@expired_column] != nil
    if (expires = expires_value(options, nil)) != nil
      @backend.execute(@update_expires,
                       options.merge(consistency: wc,
                                     arguments: [(expires || 0).to_i,
                                                 timestamp,
                                                 row[@value_column],
                                                 key,
                                                 row[@updated_column]]))
    end
    row[@value_column]
  end
end

#merge!(pairs, options = {}) ⇒ self #merge!(pairs, options = {}) {|key, old_value, new_value| ... } ⇒ self

Note:

Some adapters may implement this method atomically, or in two passes when a block is provided. The default implmentation uses #key?, #load and #store.

Stores the pairs in the key-value store, and returns itself. When a block is provided, it will be called before overwriting any existing values with the key, old value and supplied value, and the return value of the block will be used in place of the supplied value.

Overloads:

  • #merge!(pairs, options = {}) ⇒ self

    Parameters:

    • pairs (<(Object, Object)>)

      A collection of key-value pairs to store

    • options (Hash) (defaults to: {})

    Returns:

    • (self)
  • #merge!(pairs, options = {}) {|key, old_value, new_value| ... } ⇒ self

    Parameters:

    • pairs (<(Object, Object)>)

      A collection of key-value pairs to store

    • options (Hash) (defaults to: {})

    Yield Parameters:

    • key (Object)

      A key that whose value is being overwritten

    • old_value (Object)

      The existing value which is being overwritten

    • new_value (Object)

      The value supplied in the method call

    Yield Returns:

    • (Object)

      The value to use for overwriting

    Returns:

    • (self)


225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/moneta/adapters/cassandra.rb', line 225

def merge!(pairs, options = {})
  keys = pairs.map { |k, _| k }.to_a
  return self if keys.empty?

  if block_given?
    existing = Hash[slice(*keys, **options)]
    pairs = pairs.map do |key, new_value|
      if existing.key?(key)
        [key, yield(key, existing[key], new_value)]
      else
        [key, new_value]
      end
    end
  end

  _rc, wc = consistency(options)
  expires = expires_value(options)
  t = timestamp
  batch = @backend.batch do |batch|
    batch.add(@merge_delete, arguments: [t, keys])
    pairs.each do |key, value|
      batch.add(@store, arguments: [key, value, (expires || 0).to_i, t + 1])
    end
  end
  @backend.execute(batch, options.merge(consistency: wc))

  self
end

#slice(*keys, **options) ⇒ <(Object, Object)>

Note:

The keys in the return value may be the same objects that were supplied (i.e. Object#equal?), or may simply be equal (i.e. Object#==).

Note:

Some adapters may implement this method atomically. The default implmentation uses #values_at.

Returns a collection of key-value pairs corresponding to those supplied keys which are present in the key-value store, and their associated values. Only those keys present in the store will have pairs in the return value. The return value can be any enumerable object that yields pairs, so it could be a hash, but needn’t be.

Parameters:

  • keys (<Object>)

    The keys for the values to fetch

  • options (Hash)

Options Hash (**options):

  • :expires (Integer)

    Update expiration time (See Expires)

  • :raw (Boolean)

    Raw access without value transformation (See Transformer)

  • :prefix (String)

    Prefix key (See Transformer)

  • :sync (Boolean)

    Synchronized load (Cache reloads from adapter, Daybreak syncs with file)

  • Other (Object)

    options as defined by the adapters or middleware

Returns:

  • (<(Object, Object)>)

    A collection of key-value pairs



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/moneta/adapters/cassandra.rb', line 177

def slice(*keys, **options)
  rc, wc = consistency(options)
  result = @backend.execute(@slice, options.merge(consistency: rc, arguments: [keys]))
  expires = expires_value(options, nil)
  updated = [] if expires != nil
  pairs = result.map do |row|
    next if row[@expired_column] == nil
    if expires != nil
      updated << [row[@key_column], row[@value_column], row[@updated_column]]
    end
    [row[@key_column], row[@value_column]]
  end.compact

  if expires != nil && !updated.empty?
    ttl = (expires || 0).to_i
    t = timestamp
    batch = @backend.batch do |batch|
      updated.each do |key, value, updated|
        batch.add(@update_expires, arguments: [ttl, t, value, key, updated])
      end
    end

    @backend.execute(batch, options.merge(consistency: wc))
  end

  pairs
end

#store(key, value, options = {}) ⇒ Object

Store value with key

Parameters:

  • key (Object)
  • value (Object)
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :expires (Integer)

    Set expiration time (See Expires)

  • :raw (Boolean)

    Raw access without value transformation (See Transformer)

  • :prefix (String)

    Prefix key (See Transformer)

  • Other (Object)

    options as defined by the adapters or middleware

Returns:

  • value



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/moneta/adapters/cassandra.rb', line 120

def store(key, value, options = {})
  _, wc = consistency(options)
  expires = expires_value(options)
  t = timestamp
  batch = @backend.batch do |batch|
    batch.add(@store_delete, arguments: [t, key])
    batch.add(@store, arguments: [key, value, (expires || 0).to_i, t + 1])
  end
  @backend.execute(batch, options.merge(consistency: wc))
  value
end

#values_at(*keys, **options) ⇒ Array<Object, nil>

Note:

Some adapters may implement this method atomically, but the default implementation simply makes repeated calls to #load.

Returns an array containing the values associated with the given keys, in the same order as the supplied keys. If a key is not present in the key-value-store, nil is returned in its place.

Parameters:

  • keys (<Object>)

    The keys for the values to fetch

  • options (Hash)

Options Hash (**options):

  • :expires (Integer)

    Update expiration time (See Expires)

  • :raw (Boolean)

    Raw access without value transformation (See Transformer)

  • :prefix (String)

    Prefix key (See Transformer)

  • :sync (Boolean)

    Synchronized load (Cache reloads from adapter, Daybreak syncs with file)

  • Other (Object)

    options as defined by the adapters or middleware

Returns:

  • (Array<Object, nil>)

    Array containing the values requested, with nil for missing values



206
207
208
209
# File 'lib/moneta/adapters/cassandra.rb', line 206

def values_at(*keys, **options)
  hash = Hash[slice(*keys, **options)]
  keys.map { |key| hash[key] }
end