Class: Adocca::MemCache
- Inherits:
-
Object
- Object
- Adocca::MemCache
- Defined in:
- lib/am_memcache.rb
Overview
A Ruby client library for memcached. Ripped from the gem.
Defined Under Namespace
Classes: ClientError, InternalError, MemCacheError, Server, ServerError
Constant Summary collapse
- HOST_HASH =
a semi-unique id for this ruby instance
Array.new(32).collect do |e| (65 + rand(25)).chr end.join
- DEFAULT_OPTIONS =
Default options for the cache object.
{ :namespace => nil, :readonly => false }
- DEFAULT_PORT =
Default memcached port.
11211- DEFAULT_WEIGHT =
Default memcached server weight.
1- ADD_NOTSTORED =
the response when the key is present
"NOT_STORED\r\n"- ADD_STORED =
the response when the key is NOT present
"STORED\r\n"- TRANS_EXPTIME =
lock expiry time
10- TRANS_MAX_TRIES =
10
Instance Attribute Summary collapse
-
#request_timeout ⇒ Object
The amount of time to wait for a response from a memcached server.
Instance Method Summary collapse
-
#[](key) ⇒ Object
Shortcut to get a value from the cache.
-
#[]=(key, value) ⇒ Object
Shortcut to save a value in the cache.
-
#active? ⇒ Boolean
Returns whether there is at least one active server for the object.
-
#add(key, value, expiry = 0) ⇒ Object
Add an entry to the cache will return true if the entry didnt already exist.
-
#check(key) ⇒ Object
Method to check if a value is set, and NOT unmarshal, only return bool.
-
#dec(key, amount = 1) ⇒ Object
Increment an entry in the cache.
-
#delete(key, expiry = 0) ⇒ Object
Remove an entry from the cache.
-
#flush ⇒ Object
Flush cache.
-
#get(key) ⇒ Object
get an entry from the cache.
-
#get_server_for_key(key) ⇒ Object
Pick a server to handle the request based on a hash of the key.
-
#inc(key, amount = 1) ⇒ Object
Increment an entry in the cache.
-
#initialize(opts = {}) ⇒ MemCache
constructor
Valid options are:.
-
#inspect ⇒ Object
Return a string representation of the cache object.
-
#invalidate_namespace(ns) ⇒ Object
Invalidate a namespace in the cache.
-
#make_cache_key(o) ⇒ Object
Create a key for the cache, incorporating the namespace qualifier if requested.
-
#make_namespace_key(ns, appendage = nil) ⇒ Object
Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace.
-
#readonly? ⇒ Boolean
Returns whether the cache was created read only.
-
#reset ⇒ Object
Reset the connection to all memcache servers.
-
#send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response.
-
#servers=(servers) ⇒ Object
Set the servers that the requests will be distributed between.
-
#set(key, value, expiry = 0) ⇒ Object
Add an entry to the cache.
-
#sock_send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response, socket and server.
-
#synchronize(key) ⇒ Object
memcache-driven transaction mechanism.
Constructor Details
#initialize(opts = {}) ⇒ MemCache
Valid options are:
:namespace
If specified, all keys will have the given value prepended
before accessing the cache. Defaults to nil.
:readonly
If this is set, any attempt to write to the cache will generate
an exception. Defaults to false.
61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/am_memcache.rb', line 61 def initialize(opts = {}) opts = DEFAULT_OPTIONS.merge(opts) @namespace = opts[:namespace] @readonly = opts[:readonly] if ActionController::Base.allow_concurrency @mutex = Mutex.new else @mutex = NullMutex.new end @servers = [] @buckets = [] end |
Instance Attribute Details
#request_timeout ⇒ Object
The amount of time to wait for a response from a memcached server. If a response is not completed within this time, the connection to the server will be closed and an error will be raised.
49 50 51 |
# File 'lib/am_memcache.rb', line 49 def request_timeout @request_timeout end |
Instance Method Details
#[](key) ⇒ Object
Shortcut to get a value from the cache.
305 306 307 |
# File 'lib/am_memcache.rb', line 305 def [](key) self.get(key) end |
#[]=(key, value) ⇒ Object
Shortcut to save a value in the cache. This method does not set an expiration on the entry. Use set to specify an explicit expiry.
311 312 313 |
# File 'lib/am_memcache.rb', line 311 def []=(key, value) self.set(key, value) end |
#active? ⇒ Boolean
Returns whether there is at least one active server for the object.
81 82 83 |
# File 'lib/am_memcache.rb', line 81 def active? not @servers.empty? end |
#add(key, value, expiry = 0) ⇒ Object
Add an entry to the cache will return true if the entry didnt already exist
189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/am_memcache.rb', line 189 def add(key, value, expiry = 0) cache_key = make_cache_key(key) marshaled_value = nil if Integer === value marshaled_value = value.to_s else marshaled_value = Marshal.dump(value) end @mutex.synchronize do send_command(cache_key, "add #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) == ADD_STORED end end |
#check(key) ⇒ Object
Method to check if a value is set, and NOT unmarshal, only return bool
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/am_memcache.rb', line 235 def check(key) cache_key = make_cache_key(key) @mutex.synchronize do begin response, sock, server = sock_send_command(cache_key, "get #{cache_key}") return false if response =~ /^END/ v, key, flags, bytes = response.split(/ /) value = sock.read(bytes.to_i) sock.gets sock.gets rescue SystemCallError, IOError => err server.close raise MemCacheError, err. end true end end |
#dec(key, amount = 1) ⇒ Object
Increment an entry in the cache
172 173 174 175 176 177 178 179 180 |
# File 'lib/am_memcache.rb', line 172 def dec(key, amount = 1) cache_key = make_cache_key(key) rval = nil @mutex.synchronize do rval = send_command(cache_key, "decr #{cache_key} #{amount}") end rval = rval.strip.to_i if rval =~ /^\d+/ rval end |
#delete(key, expiry = 0) ⇒ Object
Remove an entry from the cache. return true if it is deleted
218 219 220 221 222 223 |
# File 'lib/am_memcache.rb', line 218 def delete(key, expiry = 0) cache_key = make_cache_key(key) @mutex.synchronize do send_command(cache_key, "delete #{cache_key} #{expiry}") end end |
#flush ⇒ Object
Flush cache
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/am_memcache.rb', line 258 def flush @mutex.synchronize do raise MemCacheError, "No active servers" unless self.active? @servers.each do |server| sock = server.socket if sock.nil? raise MemCacheError, "No connection to server" end begin sock.write "flush_all\r\n" sock.gets rescue SystemCallError, IOError => err server.close raise MemCacheError, err. end end end end |
#get(key) ⇒ Object
get an entry from the cache
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/am_memcache.rb', line 118 def get(key) cache_key = make_cache_key(key) rval = nil @mutex.synchronize do begin response, sock, server = sock_send_command(cache_key, "get #{cache_key}") return :MemCache_no_such_entry if response =~ /^END/ v, cache_key, flags, bytes = response.split(/ /) rval = sock.read(bytes.to_i) sock.gets sock.gets rescue SystemCallError, IOError => err server.close raise MemCacheError, err. end end if rval.strip.match(/^\d+$/) rval.strip.to_i else # Return the unmarshaled value. begin Marshal.load(rval) rescue ArgumentError, TypeError => err raise MemCacheError, err. end end end |
#get_server_for_key(key) ⇒ Object
Pick a server to handle the request based on a hash of the key.
531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 |
# File 'lib/am_memcache.rb', line 531 def get_server_for_key(key) # Easy enough if there is only one server. return @servers.first if @servers.length == 1 # Hash the value of the key to select the bucket. hkey = Integer("0x#{key}") # Fetch a server for the given key, retrying if that server is # offline. server = nil 20.times do |tries| server = @buckets[(hkey + tries) % @buckets.nitems] break if server.alive? end raise MemCacheError, "No servers available" unless server server end |
#inc(key, amount = 1) ⇒ Object
Increment an entry in the cache
159 160 161 162 163 164 165 166 167 |
# File 'lib/am_memcache.rb', line 159 def inc(key, amount = 1) cache_key = make_cache_key(key) rval = nil @mutex.synchronize do rval = send_command(cache_key, "incr #{cache_key} #{amount}") end rval = rval.strip.to_i if rval =~ /^\d+/ rval end |
#inspect ⇒ Object
Return a string representation of the cache object.
75 76 77 78 |
# File 'lib/am_memcache.rb', line 75 def inspect sprintf("<MemCache: %s servers, %s buckets, ns: %p, ro: %p>", @servers.nitems, @buckets.nitems, @namespace, @readonly) end |
#invalidate_namespace(ns) ⇒ Object
Invalidate a namespace in the cache.
152 153 154 |
# File 'lib/am_memcache.rb', line 152 def invalidate_namespace(ns) inc(make_namespace_key(ns).last) end |
#make_cache_key(o) ⇒ Object
Create a key for the cache, incorporating the namespace qualifier if requested.
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 |
# File 'lib/am_memcache.rb', line 514 def make_cache_key(o) if Array === o o = o.clone key = o.pop namespace_key = "" namespace_key << ":#{make_namespace_key(o).first}" unless o.empty? key = "#{@namespace}#{namespace_key}:#{key}" Digest::SHA1.new(key) else key = "#{@namespace}:#{o}" Digest::SHA1.new(key) end end |
#make_namespace_key(ns, appendage = nil) ⇒ Object
Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/am_memcache.rb', line 489 def make_namespace_key(ns, appendage = nil) # ensure that this is an Array ns = ns.to_a # get the first part of the Array next_part = ns.shift # create a unique namespace name of it next_part = [appendage, next_part].join(":") if appendage # create a unique memcache key of the namespace name next_key = "adocca:memcache:namespace:#{next_part}" # make sure it exists in memcache add(next_key, rand(1 << 31)) # get its value in memcache salt = get(next_key) if ns.empty? # if this is the last part of the key return it + its secret salt along with itself (for invalidation purposes) ["#{next_part}:#{salt}", next_key] else # otherwise, fetch the rest along with the last_part = make_namespace_key(ns, next_part) ["#{next_part}:#{salt}:#{last_part.first}", last_part.last] end end |
#readonly? ⇒ Boolean
Returns whether the cache was created read only.
86 87 88 |
# File 'lib/am_memcache.rb', line 86 def readonly? @readonly end |
#reset ⇒ Object
Reset the connection to all memcache servers. This should be called if there is a problem with a cache lookup that might have left the connection in a corrupted state.
228 229 230 231 232 |
# File 'lib/am_memcache.rb', line 228 def reset @mutex.synchronize do @servers.each { |server| server.close } end end |
#send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response
455 456 457 |
# File 'lib/am_memcache.rb', line 455 def send_command(key, command) sock_send_command(key, command)[0] end |
#servers=(servers) ⇒ Object
Set the servers that the requests will be distributed between. Entries can be either strings of the form “hostname:port” or “hostname:port:weight” or MemCache::Server objects.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/am_memcache.rb', line 93 def servers=(servers) # Create the server objects. @servers = servers.collect do |server| case server when String host, port, weight = server.split(/:/, 3) port ||= DEFAULT_PORT weight ||= DEFAULT_WEIGHT Server::new(host, port, weight) when Server server else raise TypeError, "Cannot convert %s to MemCache::Server" % svr.class.name end end # Create an array of server buckets for weight selection of servers. @buckets = [] @servers.each do |server| server.weight.times { @buckets.push(server) } end end |
#set(key, value, expiry = 0) ⇒ Object
Add an entry to the cache.
203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/am_memcache.rb', line 203 def set(key, value, expiry = 0) cache_key = make_cache_key(key) marshaled_value = nil if Integer === value marshaled_value = value.to_s else marshaled_value = Marshal.dump(value) end @mutex.synchronize do send_command(cache_key, "set #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) end end |
#sock_send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response, socket and server
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 |
# File 'lib/am_memcache.rb', line 463 def sock_send_command(key, command) raise MemCacheError, "No active servers" unless self.active? raise MemCacheError, "Update of readonly cache" if @readonly server = get_server_for_key(key) sock = server.socket if sock.nil? raise MemCacheError, "No connection to server" end response = nil begin sock.write "#{command}\r\n" [sock.gets, sock, server] rescue SystemCallError, IOError => err server.close raise MemCacheError, err. end end |
#synchronize(key) ⇒ Object
memcache-driven transaction mechanism
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/am_memcache.rb', line 284 def synchronize(key) cache_key = make_cache_key(key) tries = 0 we_locked_now = false we_locked_earlier = false ok_value = "#{HOST_HASH}:#{Thread.current.object_id}" while (!(we_locked_now = add(cache_key, ok_value, TRANS_EXPTIME)) && !(we_locked_earlier = (get(cache_key) == ok_value)) && tries < TRANS_MAX_TRIES) sleep 1 tries += 1 end if we_locked_now || we_locked_earlier yield self.delete(cache_key) if we_locked_now else raise MemCacheError, "Couldn't obtain lock" end end |