Class: Adocca::MemCache

Inherits:
Object
  • Object
show all
Defined in:
lib/am_memcache.rb

Overview

A Ruby client library for memcached. Ripped from the gem.

Defined Under Namespace

Classes: ClientError, InternalError, MemCacheError, Server, ServerError

Constant Summary collapse

HOST_HASH =

a semi-unique id for this ruby instance

Array.new(32).collect do |e| (65 + rand(25)).chr end.join
DEFAULT_OPTIONS =

Default options for the cache object.

{
  :namespace => nil,
  :readonly  => false
}
DEFAULT_PORT =

Default memcached port.

11211
DEFAULT_WEIGHT =

Default memcached server weight.

1
LIVELINESS_TIMEOUT =

The maximum amount of time we are willing to spend talking to servers before accepting that something horrible has happened.

5
ADD_NOTSTORED =

the response when the key is present

"NOT_STORED\r\n"
ADD_STORED =

the response when the key is NOT present

"STORED\r\n"
TRANS_EXPTIME =

lock expiry time

10
TRANS_MAX_TRIES =
10

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ MemCache

Valid options are:

:namespace
    If specified, all keys will have the given value prepended
    before accessing the cache.  Defaults to nil.

:readonly
    If this is set, any attempt to write to the cache will generate
    an exception.  Defaults to false.


69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/am_memcache.rb', line 69

def initialize(opts = {})
  opts = DEFAULT_OPTIONS.merge(opts)   
  @namespace = opts[:namespace]
  @readonly  = opts[:readonly]
  if ActionController::Base.allow_concurrency
    @mutex = Monitor.new
  else
    @mutex = NullMutex.new
  end
  @servers   = []
  @buckets   = []
end

Instance Attribute Details

#request_timeoutObject

The amount of time to wait for a response from a memcached server. If a response is not completed within this time, the connection to the server will be closed and an error will be raised.



57
58
59
# File 'lib/am_memcache.rb', line 57

def request_timeout
  @request_timeout
end

Class Method Details

.log_error(e) ⇒ Object



82
83
84
85
86
87
88
89
90
# File 'lib/am_memcache.rb', line 82

def self.log_error(e)
  if defined?(RAILS_DEFAULT_LOGGER)
    RAILS_DEFAULT_LOGGER.error(e)
    RAILS_DEFAULT_LOGGER.error(PP.pp(e.backtrace, ""))
  else
    puts e
    pp e.backtrace
  end
end

Instance Method Details

#[](key) ⇒ Object

Shortcut to get a value from the cache.



338
339
340
# File 'lib/am_memcache.rb', line 338

def [](key)
  self.get(key)
end

#[]=(key, value) ⇒ Object

Shortcut to save a value in the cache. This method does not set an expiration on the entry. Use set to specify an explicit expiry.



344
345
346
# File 'lib/am_memcache.rb', line 344

def []=(key, value)
  self.set(key, value)
end

#active?Boolean

Returns whether there is at least one active server for the object.

Returns:

  • (Boolean)


99
100
101
# File 'lib/am_memcache.rb', line 99

def active?
  not @servers.empty?
end

#add(key, value, expiry = 0) ⇒ Object

Add an entry to the cache will return true if the entry didnt already exist



204
205
206
207
208
209
210
211
212
213
# File 'lib/am_memcache.rb', line 204

def add(key, value, expiry = 0)
  cache_key = make_cache_key(key)
  marshaled_value = nil
  if Integer === value
    marshaled_value = value.to_s
  else
    marshaled_value = Marshal.dump(value)
  end
  send_command(cache_key, "add #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) == ADD_STORED
end

#check(key) ⇒ Object

Method to check if a value is set, and NOT unmarshal, only return bool



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/am_memcache.rb', line 244

def check(key)
  cache_key = make_cache_key(key)
  @mutex.synchronize do

    begin
      response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true)
      
      return false if response =~ /^END/
      
      v, key, flags, bytes = response.split(/ /)
      value = sock.read(bytes.to_i)
      sock.timeout_gets(LIVELINESS_TIMEOUT)
      sock.timeout_gets(LIVELINESS_TIMEOUT)
    rescue Exception => err
      server.mark_dead(err.message) if server
      self.class.log_error(err)
      return false
    end
    
    true
  end
end

#dec(key, amount = 1) ⇒ Object

Increment an entry in the cache



190
191
192
193
194
195
# File 'lib/am_memcache.rb', line 190

def dec(key, amount = 1)
  cache_key = make_cache_key(key)
  rval = send_command(cache_key, "decr #{cache_key} #{amount}")
  rval = rval.strip.to_i if rval =~ /^\d+/
  rval
end

#delete(key, expiry = 0) ⇒ Object

Remove an entry from the cache. return true if it is deleted



229
230
231
232
# File 'lib/am_memcache.rb', line 229

def delete(key, expiry = 0)
  cache_key = make_cache_key(key)
  send_command(cache_key, "delete #{cache_key} #{expiry}")
end

#flushObject

Flush cache



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/am_memcache.rb', line 268

def flush
  @mutex.synchronize do
    raise MemCacheError, "No active servers" unless self.active?
    @servers.each do |server|

      sock = server.socket
      unless sock.nil?
        begin
          sock.timeout_write("flush_all\r\n", LIVELINESS_TIMEOUT)
          sock.timeout_gets(LIVELINESS_TIMEOUT)
        rescue Exception => err
          server.mark_dead(err.message) if server
          self.class.log_error(err)
        end
      end
    end
  end
end

#get(key) ⇒ Object

get an entry from the cache



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/am_memcache.rb', line 136

def get(key)
  cache_key = make_cache_key(key)
  rval = nil
  @mutex.synchronize do
    begin
      response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true)
      return :MemCache_no_such_entry if response =~ /^END/

      v, cache_key, flags, bytes = response.split(/ /)
      rval = sock.read(bytes.to_i)

      sock.timeout_gets(LIVELINESS_TIMEOUT)
      sock.timeout_gets(LIVELINESS_TIMEOUT)
    rescue Exception => err
      server.mark_dead(err.message) if server
      
      self.class.log_error(err)
      return :MemCache_no_such_entry
    end
  end

  if rval.strip.match(/\A\d+\z/)
    rval.strip.to_i
  else
    # Return the unmarshaled value.
    begin
      Marshal.load(rval)
    rescue ArgumentError, TypeError => err
      self.class.log_error(err)
      return :MemCache_no_such_entry
    end
  end
end

#get_server_for_key(key) ⇒ Object

Pick a server to handle the request based on a hash of the key.

Raises:



592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
# File 'lib/am_memcache.rb', line 592

def get_server_for_key(key)
  # Hash the value of the key to select the bucket.
  hkey = Integer("0x#{key}")
  
  # Fetch a server for the given key, retrying if that server is
  # offline.
  server = nil
  20.times do |tries|
    server = @buckets[(hkey + tries) % @buckets.nitems]
    break if server.alive?
  end

  raise MemCacheError, "No servers available" unless server
  server
end

#inc(key, amount = 1) ⇒ Object

Increment an entry in the cache



180
181
182
183
184
185
# File 'lib/am_memcache.rb', line 180

def inc(key, amount = 1)
  cache_key = make_cache_key(key)
  rval = send_command(cache_key, "incr #{cache_key} #{amount}")
  rval = rval.strip.to_i if rval =~ /^\d+/
  rval
end

#inspectObject

Return a string representation of the cache object.



93
94
95
96
# File 'lib/am_memcache.rb', line 93

def inspect
  sprintf("<MemCache: %s servers, %s buckets, ns: %p, ro: %p>",
          @servers.nitems, @buckets.nitems, @namespace, @readonly)
end

#invalidate_namespace(ns) ⇒ Object

Invalidate a namespace in the cache.



173
174
175
# File 'lib/am_memcache.rb', line 173

def invalidate_namespace(ns)
  inc(make_namespace_key(ns).last)
end

#lock(key, options = {}) ⇒ Object

Will create a reentrant lock on key, but waiting no more than timeout for the lock. Will sleep 1 second between each try.

Will return whether the locking was successful.



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/am_memcache.rb', line 297

def lock(key, options = {})
  timeout = options[:timeout] || 1 << 32
  expire = options[:expire] || 0
  cache_key = make_cache_key(key)
  start_at = Time.now
  we_locked_now = false
  we_locked_earlier = false
  ok_value = "#{HOST_HASH}:#{Thread.current.object_id}"
  while (!(we_locked_now = add(cache_key, ok_value, expire)) && 
         !(we_locked_earlier = (get(cache_key) == ok_value)) &&
         Time.now < start_at + timeout)
    sleep 1
  end
  return we_locked_now || we_locked_earlier
end

#make_cache_key(o) ⇒ Object

Create a key for the cache, incorporating the namespace qualifier if requested.



575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/am_memcache.rb', line 575

def make_cache_key(o)
  if Array === o
    o = o.clone
    key = o.pop

    namespace_key = ""
    namespace_key << ":#{make_namespace_key(o).first}" unless o.empty?

    key = "#{@namespace}#{namespace_key}:#{key}"
    Digest::SHA1.hexdigest(key)
  else
    key = "#{@namespace}:#{o}"
    Digest::SHA1.hexdigest(key)
  end
end

#make_namespace_key(ns, appendage = nil) ⇒ Object

Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace



550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
# File 'lib/am_memcache.rb', line 550

def make_namespace_key(ns, appendage = nil)
  # ensure that this is an Array
  ns = ns.to_a
  # get the first part of the Array
  next_part = ns.shift
  # create a unique namespace name of it
  next_part = [appendage, next_part].join(":") if appendage
  # create a unique memcache key of the namespace name
  next_key = "adocca:memcache:namespace:#{next_part}"
  # make sure it exists in memcache
  add(next_key, rand(1 << 31))
  # get its value in memcache
  salt = get(next_key)
  if ns.empty?
    # if this is the last part of the key return it + its secret salt along with itself (for invalidation purposes)
    ["#{next_part}:#{salt}", next_key]
  else
    # otherwise, fetch the rest along with the 
    last_part = make_namespace_key(ns, next_part)
    ["#{next_part}:#{salt}:#{last_part.first}", last_part.last]
  end
end

#readonly?Boolean

Returns whether the cache was created read only.

Returns:

  • (Boolean)


104
105
106
# File 'lib/am_memcache.rb', line 104

def readonly?
  @readonly
end

#resetObject

Reset the connection to all memcache servers. This should be called if there is a problem with a cache lookup that might have left the connection in a corrupted state.



237
238
239
240
241
# File 'lib/am_memcache.rb', line 237

def reset
  @mutex.synchronize do
    @servers.each { |server| server.close }
  end
end

#send_command(key, command) ⇒ Object

Will send a command to the proper server for this key and return the response



511
512
513
# File 'lib/am_memcache.rb', line 511

def send_command(key, command)
  sock_send_command(key, command)[0]
end

#servers=(servers) ⇒ Object

Set the servers that the requests will be distributed between. Entries can be either strings of the form “hostname:port” or “hostname:port:weight” or MemCache::Server objects.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/am_memcache.rb', line 111

def servers=(servers)
  # Create the server objects.
  @servers = servers.collect do |server|
    case server
    when String
      host, port, weight = server.split(/:/, 3)
      port ||= DEFAULT_PORT
      weight ||= DEFAULT_WEIGHT
      Server::new(host, port, weight)
    when Server
      server
    else
      raise TypeError, "Cannot convert %s to MemCache::Server" %
        svr.class.name
    end
  end

  # Create an array of server buckets for weight selection of servers.
  @buckets = []
  @servers.each do |server|
    server.weight.times { @buckets.push(server) }
  end
end

#set(key, value, expiry = 0) ⇒ Object

Add an entry to the cache.



216
217
218
219
220
221
222
223
224
225
# File 'lib/am_memcache.rb', line 216

def set(key, value, expiry = 0)
  cache_key = make_cache_key(key)
  marshaled_value = nil
  if Integer === value
    marshaled_value = value.to_s
  else
    marshaled_value = Marshal.dump(value)
  end
  send_command(cache_key, "set #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value)
end

#sock_send_command(key, command, squash_errors = false) ⇒ Object

Will send a command to the proper server for this key and return the response, socket and server



519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# File 'lib/am_memcache.rb', line 519

def sock_send_command(key, command, squash_errors = false)
  @mutex.synchronize do
    server = nil
    begin
      raise MemCacheError, "No active servers" unless self.active?
      raise MemCacheError, "Update of readonly cache" if @readonly
      server = get_server_for_key(key)
      
      sock = server.socket
      raise MemCacheError, "No connection to server" if sock.nil?
      
      response = nil
      
      sock.timeout_write("#{command}\r\n", LIVELINESS_TIMEOUT)
      [sock.timeout_gets(LIVELINESS_TIMEOUT), sock, server]
                
    rescue Exception => err
      server.mark_dead(err.message) if server
      self.class.log_error(err)
      raise err unless squash_errors
      return err.message
    end
  end
end

#synchronize(key, &block) ⇒ Object

memcache-driven locking mechanism

Uses lock and unlock



325
326
327
328
329
330
331
332
333
334
335
# File 'lib/am_memcache.rb', line 325

def synchronize(key, &block)
  if lock_success = lock(key, :expire => TRANS_EXPTIME, :timeout => TRANS_MAX_TRIES)
    begin
      yield
    ensure
      unlock(key)
    end
  else
    raise MemCacheError, "Couldn't obtain lock on #{key}"
  end
end

#unlock(key) ⇒ Object

Will release any lock on key.



316
317
318
# File 'lib/am_memcache.rb', line 316

def unlock(key)
  self.delete(make_cache_key(key))
end