Class: Adocca::MemCache
- Inherits:
-
Object
- Object
- Adocca::MemCache
- Defined in:
- lib/am_memcache.rb
Overview
A Ruby client library for memcached. Ripped from the gem.
Defined Under Namespace
Classes: ClientError, InternalError, MemCacheError, Server, ServerError
Constant Summary collapse
- HOST_HASH =
a semi-unique id for this ruby instance
Array.new(32).collect do |e| (65 + rand(25)).chr end.join
- DEFAULT_OPTIONS =
Default options for the cache object.
{ :namespace => nil, :readonly => false }
- DEFAULT_PORT =
Default memcached port.
11211
- DEFAULT_WEIGHT =
Default memcached server weight.
1
- LIVELINESS_TIMEOUT =
The maximum amount of time we are willing to spend talking to servers before accepting that something horrible has happened.
5
- ADD_NOTSTORED =
the response when the key is present
"NOT_STORED\r\n"
- ADD_STORED =
the response when the key is NOT present
"STORED\r\n"
- TRANS_EXPTIME =
lock expiry time
10
- TRANS_MAX_TRIES =
10
Instance Attribute Summary collapse
-
#request_timeout ⇒ Object
The amount of time to wait for a response from a memcached server.
Class Method Summary collapse
Instance Method Summary collapse
-
#[](key) ⇒ Object
Shortcut to get a value from the cache.
-
#[]=(key, value) ⇒ Object
Shortcut to save a value in the cache.
-
#active? ⇒ Boolean
Returns whether there is at least one active server for the object.
-
#add(key, value, expiry = 0) ⇒ Object
Add an entry to the cache will return true if the entry didnt already exist.
-
#check(key) ⇒ Object
Method to check if a value is set, and NOT unmarshal, only return bool.
-
#dec(key, amount = 1) ⇒ Object
Increment an entry in the cache.
-
#delete(key, expiry = 0) ⇒ Object
Remove an entry from the cache.
-
#flush ⇒ Object
Flush cache.
-
#get(key) ⇒ Object
get an entry from the cache.
-
#get_server_for_key(key) ⇒ Object
Pick a server to handle the request based on a hash of the key.
-
#inc(key, amount = 1) ⇒ Object
Increment an entry in the cache.
-
#initialize(opts = {}) ⇒ MemCache
constructor
Valid options are:.
-
#inspect ⇒ Object
Return a string representation of the cache object.
-
#invalidate_namespace(ns) ⇒ Object
Invalidate a namespace in the cache.
-
#lock(key, options = {}) ⇒ Object
Will create a reentrant lock on
key
, but waiting no more thantimeout
for the lock. -
#make_cache_key(o) ⇒ Object
Create a key for the cache, incorporating the namespace qualifier if requested.
-
#make_namespace_key(ns, appendage = nil) ⇒ Object
Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace.
-
#readonly? ⇒ Boolean
Returns whether the cache was created read only.
-
#reset ⇒ Object
Reset the connection to all memcache servers.
-
#send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response.
-
#servers=(servers) ⇒ Object
Set the servers that the requests will be distributed between.
-
#set(key, value, expiry = 0) ⇒ Object
Add an entry to the cache.
-
#sock_send_command(key, command, squash_errors = false) ⇒ Object
Will send a command to the proper server for this key and return the response, socket and server.
-
#synchronize(key, &block) ⇒ Object
memcache-driven locking mechanism.
-
#unlock(key) ⇒ Object
Will release any lock on
key
.
Constructor Details
#initialize(opts = {}) ⇒ MemCache
Valid options are:
:namespace
If specified, all keys will have the given value prepended
before accessing the cache. Defaults to nil.
:readonly
If this is set, any attempt to write to the cache will generate
an exception. Defaults to false.
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/am_memcache.rb', line 69 def initialize(opts = {}) opts = DEFAULT_OPTIONS.merge(opts) @namespace = opts[:namespace] @readonly = opts[:readonly] if ActionController::Base.allow_concurrency @mutex = Monitor.new else @mutex = NullMutex.new end @servers = [] @buckets = [] end |
Instance Attribute Details
#request_timeout ⇒ Object
The amount of time to wait for a response from a memcached server. If a response is not completed within this time, the connection to the server will be closed and an error will be raised.
57 58 59 |
# File 'lib/am_memcache.rb', line 57 def request_timeout @request_timeout end |
Class Method Details
.log_error(e) ⇒ Object
82 83 84 85 86 87 88 89 90 |
# File 'lib/am_memcache.rb', line 82 def self.log_error(e) if defined?(RAILS_DEFAULT_LOGGER) RAILS_DEFAULT_LOGGER.error(e) RAILS_DEFAULT_LOGGER.error(PP.pp(e.backtrace, "")) else puts e pp e.backtrace end end |
Instance Method Details
#[](key) ⇒ Object
Shortcut to get a value from the cache.
338 339 340 |
# File 'lib/am_memcache.rb', line 338 def [](key) self.get(key) end |
#[]=(key, value) ⇒ Object
Shortcut to save a value in the cache. This method does not set an expiration on the entry. Use set to specify an explicit expiry.
344 345 346 |
# File 'lib/am_memcache.rb', line 344 def []=(key, value) self.set(key, value) end |
#active? ⇒ Boolean
Returns whether there is at least one active server for the object.
99 100 101 |
# File 'lib/am_memcache.rb', line 99 def active? not @servers.empty? end |
#add(key, value, expiry = 0) ⇒ Object
Add an entry to the cache will return true if the entry didnt already exist
204 205 206 207 208 209 210 211 212 213 |
# File 'lib/am_memcache.rb', line 204 def add(key, value, expiry = 0) cache_key = make_cache_key(key) marshaled_value = nil if Integer === value marshaled_value = value.to_s else marshaled_value = Marshal.dump(value) end send_command(cache_key, "add #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) == ADD_STORED end |
#check(key) ⇒ Object
Method to check if a value is set, and NOT unmarshal, only return bool
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/am_memcache.rb', line 244 def check(key) cache_key = make_cache_key(key) @mutex.synchronize do begin response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true) return false if response =~ /^END/ v, key, flags, bytes = response.split(/ /) value = sock.read(bytes.to_i) sock.timeout_gets(LIVELINESS_TIMEOUT) sock.timeout_gets(LIVELINESS_TIMEOUT) rescue Exception => err server.mark_dead(err.) if server self.class.log_error(err) return false end true end end |
#dec(key, amount = 1) ⇒ Object
Increment an entry in the cache
190 191 192 193 194 195 |
# File 'lib/am_memcache.rb', line 190 def dec(key, amount = 1) cache_key = make_cache_key(key) rval = send_command(cache_key, "decr #{cache_key} #{amount}") rval = rval.strip.to_i if rval =~ /^\d+/ rval end |
#delete(key, expiry = 0) ⇒ Object
Remove an entry from the cache. return true if it is deleted
229 230 231 232 |
# File 'lib/am_memcache.rb', line 229 def delete(key, expiry = 0) cache_key = make_cache_key(key) send_command(cache_key, "delete #{cache_key} #{expiry}") end |
#flush ⇒ Object
Flush cache
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/am_memcache.rb', line 268 def flush @mutex.synchronize do raise MemCacheError, "No active servers" unless self.active? @servers.each do |server| sock = server.socket unless sock.nil? begin sock.timeout_write("flush_all\r\n", LIVELINESS_TIMEOUT) sock.timeout_gets(LIVELINESS_TIMEOUT) rescue Exception => err server.mark_dead(err.) if server self.class.log_error(err) end end end end end |
#get(key) ⇒ Object
get an entry from the cache
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/am_memcache.rb', line 136 def get(key) cache_key = make_cache_key(key) rval = nil @mutex.synchronize do begin response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true) return :MemCache_no_such_entry if response =~ /^END/ v, cache_key, flags, bytes = response.split(/ /) rval = sock.read(bytes.to_i) sock.timeout_gets(LIVELINESS_TIMEOUT) sock.timeout_gets(LIVELINESS_TIMEOUT) rescue Exception => err server.mark_dead(err.) if server self.class.log_error(err) return :MemCache_no_such_entry end end if rval.strip.match(/\A\d+\z/) rval.strip.to_i else # Return the unmarshaled value. begin Marshal.load(rval) rescue ArgumentError, TypeError => err self.class.log_error(err) return :MemCache_no_such_entry end end end |
#get_server_for_key(key) ⇒ Object
Pick a server to handle the request based on a hash of the key.
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 |
# File 'lib/am_memcache.rb', line 592 def get_server_for_key(key) # Hash the value of the key to select the bucket. hkey = Integer("0x#{key}") # Fetch a server for the given key, retrying if that server is # offline. server = nil 20.times do |tries| server = @buckets[(hkey + tries) % @buckets.nitems] break if server.alive? end raise MemCacheError, "No servers available" unless server server end |
#inc(key, amount = 1) ⇒ Object
Increment an entry in the cache
180 181 182 183 184 185 |
# File 'lib/am_memcache.rb', line 180 def inc(key, amount = 1) cache_key = make_cache_key(key) rval = send_command(cache_key, "incr #{cache_key} #{amount}") rval = rval.strip.to_i if rval =~ /^\d+/ rval end |
#inspect ⇒ Object
Return a string representation of the cache object.
93 94 95 96 |
# File 'lib/am_memcache.rb', line 93 def inspect sprintf("<MemCache: %s servers, %s buckets, ns: %p, ro: %p>", @servers.nitems, @buckets.nitems, @namespace, @readonly) end |
#invalidate_namespace(ns) ⇒ Object
Invalidate a namespace in the cache.
173 174 175 |
# File 'lib/am_memcache.rb', line 173 def invalidate_namespace(ns) inc(make_namespace_key(ns).last) end |
#lock(key, options = {}) ⇒ Object
Will create a reentrant lock on key
, but waiting no more than timeout
for the lock. Will sleep 1 second between each try.
Will return whether the locking was successful.
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/am_memcache.rb', line 297 def lock(key, = {}) timeout = [:timeout] || 1 << 32 expire = [:expire] || 0 cache_key = make_cache_key(key) start_at = Time.now we_locked_now = false we_locked_earlier = false ok_value = "#{HOST_HASH}:#{Thread.current.object_id}" while (!(we_locked_now = add(cache_key, ok_value, expire)) && !(we_locked_earlier = (get(cache_key) == ok_value)) && Time.now < start_at + timeout) sleep 1 end return we_locked_now || we_locked_earlier end |
#make_cache_key(o) ⇒ Object
Create a key for the cache, incorporating the namespace qualifier if requested.
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 |
# File 'lib/am_memcache.rb', line 575 def make_cache_key(o) if Array === o o = o.clone key = o.pop namespace_key = "" namespace_key << ":#{make_namespace_key(o).first}" unless o.empty? key = "#{@namespace}#{namespace_key}:#{key}" Digest::SHA1.hexdigest(key) else key = "#{@namespace}:#{o}" Digest::SHA1.hexdigest(key) end end |
#make_namespace_key(ns, appendage = nil) ⇒ Object
Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 |
# File 'lib/am_memcache.rb', line 550 def make_namespace_key(ns, appendage = nil) # ensure that this is an Array ns = ns.to_a # get the first part of the Array next_part = ns.shift # create a unique namespace name of it next_part = [appendage, next_part].join(":") if appendage # create a unique memcache key of the namespace name next_key = "adocca:memcache:namespace:#{next_part}" # make sure it exists in memcache add(next_key, rand(1 << 31)) # get its value in memcache salt = get(next_key) if ns.empty? # if this is the last part of the key return it + its secret salt along with itself (for invalidation purposes) ["#{next_part}:#{salt}", next_key] else # otherwise, fetch the rest along with the last_part = make_namespace_key(ns, next_part) ["#{next_part}:#{salt}:#{last_part.first}", last_part.last] end end |
#readonly? ⇒ Boolean
Returns whether the cache was created read only.
104 105 106 |
# File 'lib/am_memcache.rb', line 104 def readonly? @readonly end |
#reset ⇒ Object
Reset the connection to all memcache servers. This should be called if there is a problem with a cache lookup that might have left the connection in a corrupted state.
237 238 239 240 241 |
# File 'lib/am_memcache.rb', line 237 def reset @mutex.synchronize do @servers.each { |server| server.close } end end |
#send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response
511 512 513 |
# File 'lib/am_memcache.rb', line 511 def send_command(key, command) sock_send_command(key, command)[0] end |
#servers=(servers) ⇒ Object
Set the servers that the requests will be distributed between. Entries can be either strings of the form “hostname:port” or “hostname:port:weight” or MemCache::Server objects.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/am_memcache.rb', line 111 def servers=(servers) # Create the server objects. @servers = servers.collect do |server| case server when String host, port, weight = server.split(/:/, 3) port ||= DEFAULT_PORT weight ||= DEFAULT_WEIGHT Server::new(host, port, weight) when Server server else raise TypeError, "Cannot convert %s to MemCache::Server" % svr.class.name end end # Create an array of server buckets for weight selection of servers. @buckets = [] @servers.each do |server| server.weight.times { @buckets.push(server) } end end |
#set(key, value, expiry = 0) ⇒ Object
Add an entry to the cache.
216 217 218 219 220 221 222 223 224 225 |
# File 'lib/am_memcache.rb', line 216 def set(key, value, expiry = 0) cache_key = make_cache_key(key) marshaled_value = nil if Integer === value marshaled_value = value.to_s else marshaled_value = Marshal.dump(value) end send_command(cache_key, "set #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) end |
#sock_send_command(key, command, squash_errors = false) ⇒ Object
Will send a command to the proper server for this key and return the response, socket and server
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 |
# File 'lib/am_memcache.rb', line 519 def sock_send_command(key, command, squash_errors = false) @mutex.synchronize do server = nil begin raise MemCacheError, "No active servers" unless self.active? raise MemCacheError, "Update of readonly cache" if @readonly server = get_server_for_key(key) sock = server.socket raise MemCacheError, "No connection to server" if sock.nil? response = nil sock.timeout_write("#{command}\r\n", LIVELINESS_TIMEOUT) [sock.timeout_gets(LIVELINESS_TIMEOUT), sock, server] rescue Exception => err server.mark_dead(err.) if server self.class.log_error(err) raise err unless squash_errors return err. end end end |
#synchronize(key, &block) ⇒ Object
memcache-driven locking mechanism
Uses lock and unlock
325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/am_memcache.rb', line 325 def synchronize(key, &block) if lock_success = lock(key, :expire => TRANS_EXPTIME, :timeout => TRANS_MAX_TRIES) begin yield ensure unlock(key) end else raise MemCacheError, "Couldn't obtain lock on #{key}" end end |
#unlock(key) ⇒ Object
Will release any lock on key
.
316 317 318 |
# File 'lib/am_memcache.rb', line 316 def unlock(key) self.delete(make_cache_key(key)) end |