Class: Adocca::MemCache

Inherits:
Object
  • Object
show all
Defined in:
lib/am_memcache.rb

Overview

A Ruby client library for memcached. Ripped from the gem.

Defined Under Namespace

Classes: ClientError, InternalError, MemCacheError, Server, ServerError

Constant Summary collapse

HOST_HASH =

a semi-unique id for this ruby instance

Array.new(32).collect do |e| (65 + rand(25)).chr end.join
DEFAULT_OPTIONS =

Default options for the cache object.

{
  :namespace => nil,
  :readonly  => false
}
DEFAULT_PORT =

Default memcached port.

11211
DEFAULT_WEIGHT =

Default memcached server weight.

1
ADD_NOTSTORED =

the response when the key is present

"NOT_STORED\r\n"
ADD_STORED =

the response when the key is NOT present

"STORED\r\n"
TRANS_EXPTIME =

lock expiry time

10
TRANS_MAX_TRIES =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ MemCache

Valid options are:

:namespace
    If specified, all keys will have the given value prepended
    before accessing the cache.  Defaults to nil.

:readonly
    If this is set, any attempt to write to the cache will generate
    an exception.  Defaults to false.


61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/am_memcache.rb', line 61

def initialize(opts = {})
  opts = DEFAULT_OPTIONS.merge(opts)   
  @namespace = opts[:namespace]
  @readonly  = opts[:readonly]
  if ActionController::Base.allow_concurrency
    @mutex = Mutex.new
  else
    @mutex = NullMutex.new
  end
  @servers   = []
  @buckets   = []
end

Instance Attribute Details

#request_timeoutObject

The amount of time to wait for a response from a memcached server. If a response is not completed within this time, the connection to the server will be closed and an error will be raised.



49
50
51
# File 'lib/am_memcache.rb', line 49

def request_timeout
  @request_timeout
end

Instance Method Details

#[](key) ⇒ Object

Shortcut to get a value from the cache.



305
306
307
# File 'lib/am_memcache.rb', line 305

def [](key)
  self.get(key)
end

#[]=(key, value) ⇒ Object

Shortcut to save a value in the cache. This method does not set an expiration on the entry. Use set to specify an explicit expiry.



311
312
313
# File 'lib/am_memcache.rb', line 311

def []=(key, value)
  self.set(key, value)
end

#active?Boolean

Returns whether there is at least one active server for the object.

Returns:

  • (Boolean)


81
82
83
# File 'lib/am_memcache.rb', line 81

def active?
  not @servers.empty?
end

#add(key, value, expiry = 0) ⇒ Object

Add an entry to the cache will return true if the entry didnt already exist



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/am_memcache.rb', line 189

def add(key, value, expiry = 0)
  cache_key = make_cache_key(key)
  marshaled_value = nil
  if Integer === value
    marshaled_value = value.to_s
  else
    marshaled_value = Marshal.dump(value)
  end
  @mutex.synchronize do
    send_command(cache_key, "add #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) == ADD_STORED
  end
end

#check(key) ⇒ Object

Method to check if a value is set, and NOT unmarshal, only return bool



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/am_memcache.rb', line 235

def check(key)
  cache_key = make_cache_key(key)
  @mutex.synchronize do

    begin
      response, sock, server = sock_send_command(cache_key, "get #{cache_key}")
      
      return false if response =~ /^END/
      
      v, key, flags, bytes = response.split(/ /)
      value = sock.read(bytes.to_i)
      sock.gets
      sock.gets
    rescue SystemCallError, IOError => err
      server.close
      raise MemCacheError, err.message
    end
    
    true
  end
end

#dec(key, amount = 1) ⇒ Object

Increment an entry in the cache



172
173
174
175
176
177
178
179
180
# File 'lib/am_memcache.rb', line 172

def dec(key, amount = 1)
  cache_key = make_cache_key(key)
  rval = nil
  @mutex.synchronize do
    rval = send_command(cache_key, "decr #{cache_key} #{amount}")
  end
  rval = rval.strip.to_i if rval =~ /^\d+/
  rval
end

#delete(key, expiry = 0) ⇒ Object

Remove an entry from the cache. return true if it is deleted



218
219
220
221
222
223
# File 'lib/am_memcache.rb', line 218

def delete(key, expiry = 0)
  cache_key = make_cache_key(key)
  @mutex.synchronize do
    send_command(cache_key, "delete #{cache_key} #{expiry}")
  end
end

#flushObject

Flush cache



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/am_memcache.rb', line 258

def flush
  @mutex.synchronize do
    raise MemCacheError, "No active servers" unless self.active?
    @servers.each do |server|

      sock = server.socket
      if sock.nil?
        raise MemCacheError, "No connection to server"
      end

      begin
        sock.write "flush_all\r\n"
        sock.gets
      rescue SystemCallError, IOError => err
        server.close
        raise MemCacheError, err.message
      end
    end
  end
end

#get(key) ⇒ Object

get an entry from the cache



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/am_memcache.rb', line 118

def get(key)
  cache_key = make_cache_key(key)
  rval = nil
  @mutex.synchronize do
    begin
      response, sock, server = sock_send_command(cache_key, "get #{cache_key}")
      return :MemCache_no_such_entry if response =~ /^END/

      v, cache_key, flags, bytes = response.split(/ /)
      rval = sock.read(bytes.to_i)

      sock.gets
      sock.gets
    rescue SystemCallError, IOError => err
      server.close
      raise MemCacheError, err.message
    end
  end

  if rval.strip.match(/^\d+$/)
    rval.strip.to_i
  else
    # Return the unmarshaled value.
    begin
      Marshal.load(rval)
    rescue ArgumentError, TypeError => err
      raise MemCacheError, err.message
    end
  end
end

#get_server_for_key(key) ⇒ Object

Pick a server to handle the request based on a hash of the key.

Raises:



531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/am_memcache.rb', line 531

def get_server_for_key(key)
  # Easy enough if there is only one server.
  return @servers.first if @servers.length == 1

  # Hash the value of the key to select the bucket.
  hkey = Integer("0x#{key}")
  
  # Fetch a server for the given key, retrying if that server is
  # offline.
  server = nil
  20.times do |tries|
    server = @buckets[(hkey + tries) % @buckets.nitems]
    break if server.alive?
  end

  raise MemCacheError, "No servers available" unless server
  server
end

#inc(key, amount = 1) ⇒ Object

Increment an entry in the cache



159
160
161
162
163
164
165
166
167
# File 'lib/am_memcache.rb', line 159

def inc(key, amount = 1)
  cache_key = make_cache_key(key)
  rval = nil
  @mutex.synchronize do
    rval = send_command(cache_key, "incr #{cache_key} #{amount}")
  end
  rval = rval.strip.to_i if rval =~ /^\d+/
  rval
end

#inspectObject

Return a string representation of the cache object.



75
76
77
78
# File 'lib/am_memcache.rb', line 75

def inspect
  sprintf("<MemCache: %s servers, %s buckets, ns: %p, ro: %p>",
          @servers.nitems, @buckets.nitems, @namespace, @readonly)
end

#invalidate_namespace(ns) ⇒ Object

Invalidate a namespace in the cache.



152
153
154
# File 'lib/am_memcache.rb', line 152

def invalidate_namespace(ns)
  inc(make_namespace_key(ns).last)
end

#make_cache_key(o) ⇒ Object

Create a key for the cache, incorporating the namespace qualifier if requested.



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
# File 'lib/am_memcache.rb', line 514

def make_cache_key(o)
  if Array === o
    o = o.clone
    key = o.pop

    namespace_key = ""
    namespace_key << ":#{make_namespace_key(o).first}" unless o.empty?

    key = "#{@namespace}#{namespace_key}:#{key}"
    Digest::SHA1.new(key)
  else
    key = "#{@namespace}:#{o}"
    Digest::SHA1.new(key)
  end
end

#make_namespace_key(ns, appendage = nil) ⇒ Object

Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace



489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/am_memcache.rb', line 489

def make_namespace_key(ns, appendage = nil)
  # ensure that this is an Array
  ns = ns.to_a
  # get the first part of the Array
  next_part = ns.shift
  # create a unique namespace name of it
  next_part = [appendage, next_part].join(":") if appendage
  # create a unique memcache key of the namespace name
  next_key = "adocca:memcache:namespace:#{next_part}"
  # make sure it exists in memcache
  add(next_key, rand(1 << 31))
  # get its value in memcache
  salt = get(next_key)
  if ns.empty?
    # if this is the last part of the key return it + its secret salt along with itself (for invalidation purposes)
    ["#{next_part}:#{salt}", next_key]
  else
    # otherwise, fetch the rest along with the 
    last_part = make_namespace_key(ns, next_part)
    ["#{next_part}:#{salt}:#{last_part.first}", last_part.last]
  end
end

#readonly?Boolean

Returns whether the cache was created read only.

Returns:

  • (Boolean)


86
87
88
# File 'lib/am_memcache.rb', line 86

def readonly?
  @readonly
end

#resetObject

Reset the connection to all memcache servers. This should be called if there is a problem with a cache lookup that might have left the connection in a corrupted state.



228
229
230
231
232
# File 'lib/am_memcache.rb', line 228

def reset
  @mutex.synchronize do
    @servers.each { |server| server.close }
  end
end

#send_command(key, command) ⇒ Object

Will send a command to the proper server for this key and return the response



455
456
457
# File 'lib/am_memcache.rb', line 455

def send_command(key, command)
  sock_send_command(key, command)[0]
end

#servers=(servers) ⇒ Object

Set the servers that the requests will be distributed between. Entries can be either strings of the form “hostname:port” or “hostname:port:weight” or MemCache::Server objects.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/am_memcache.rb', line 93

def servers=(servers)
  # Create the server objects.
  @servers = servers.collect do |server|
    case server
    when String
      host, port, weight = server.split(/:/, 3)
      port ||= DEFAULT_PORT
      weight ||= DEFAULT_WEIGHT
      Server::new(host, port, weight)
    when Server
      server
    else
      raise TypeError, "Cannot convert %s to MemCache::Server" %
        svr.class.name
    end
  end

  # Create an array of server buckets for weight selection of servers.
  @buckets = []
  @servers.each do |server|
    server.weight.times { @buckets.push(server) }
  end
end

#set(key, value, expiry = 0) ⇒ Object

Add an entry to the cache.



203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/am_memcache.rb', line 203

def set(key, value, expiry = 0)
  cache_key = make_cache_key(key)
  marshaled_value = nil
  if Integer === value
    marshaled_value = value.to_s
  else
    marshaled_value = Marshal.dump(value)
  end
  @mutex.synchronize do
    send_command(cache_key, "set #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value)
  end
end

#sock_send_command(key, command) ⇒ Object

Will send a command to the proper server for this key and return the response, socket and server

Raises:



463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'lib/am_memcache.rb', line 463

def sock_send_command(key, command)
  raise MemCacheError, "No active servers" unless self.active?
  raise MemCacheError, "Update of readonly cache" if @readonly
  server = get_server_for_key(key)

  sock = server.socket
  if sock.nil?
    raise MemCacheError, "No connection to server"
  end

  response = nil
  begin
    sock.write "#{command}\r\n"
    [sock.gets, sock, server]
  rescue SystemCallError, IOError => err
    server.close
    raise MemCacheError, err.message
  end
end

#synchronize(key) ⇒ Object

memcache-driven transaction mechanism



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/am_memcache.rb', line 284

def synchronize(key)
  cache_key = make_cache_key(key)
  tries = 0
  we_locked_now = false
  we_locked_earlier = false
  ok_value = "#{HOST_HASH}:#{Thread.current.object_id}"
  while (!(we_locked_now = add(cache_key, ok_value, TRANS_EXPTIME)) && 
         !(we_locked_earlier = (get(cache_key) == ok_value)) &&
         tries < TRANS_MAX_TRIES)
    sleep 1
    tries += 1
  end
  if we_locked_now || we_locked_earlier
    yield
    self.delete(cache_key) if we_locked_now
  else
    raise MemCacheError, "Couldn't obtain lock"
  end
end