Module: EventMachine::Protocols::Memcache

Includes:
Deferrable
Included in:
TestConnection
Defined in:
lib/em/protocols/memcache.rb

Overview

Implements the Memcache protocol (code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support

Usage example

EM.run{
  cache = EM::P::Memcache.connect 'localhost', 11211

  cache.set :a, 'hello'
  cache.set :b, 'hi'
  cache.set :c, 'how are you?'
  cache.set :d, ''

  cache.get(:a){ |v| p v }
  cache.get_hash(:a, :b, :c, :d){ |v| p v }
  cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

  cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }

  cache.get(:missing){ |m| p [:missing=, m] }
  cache.set(:missing, 'abc'){ p :stored }
  cache.get(:missing){ |m| p [:missing=, m] }
  cache.del(:missing){ p :deleted }
  cache.get(:missing){ |m| p [:missing=, m] }
}

Defined Under Namespace

Classes: ParserError

Constant Summary collapse

Cstored =
'STORED'.freeze
Cend =
'END'.freeze
Cdeleted =
'DELETED'.freeze
Cunknown =
'NOT_FOUND'.freeze
Cerror =
'ERROR'.freeze
Cempty =
''.freeze
Cdelimiter =
"\r\n".freeze

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Deferrable

#callback, #cancel_timeout, #errback, #fail, future, #set_deferred_status, #succeed, #timeout

Class Method Details

.connect(host = 'localhost', port = 11211) ⇒ Object

Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4)



109
110
111
# File 'lib/em/protocols/memcache.rb', line 109

def self.connect host = 'localhost', port = 11211
  EM.connect host, port, self, host, port
end

Instance Method Details

#connection_completedObject



133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/em/protocols/memcache.rb', line 133

def connection_completed
  @get_cbs = []
  @set_cbs = []
  @del_cbs = []

  @values = {}

  @reconnecting = false
  @connected = true
  succeed
  # set_delimiter "\r\n"
  # set_line_mode
end

#delete(key, expires = 0, &cb) ⇒ Object Also known as: del

Delete the value associated with a key

cache.del :a
cache.del(:b){ puts "deleted the value!" }


100
101
102
103
104
105
# File 'lib/em/protocols/memcache.rb', line 100

def delete key, expires = 0, &cb
  callback{
    send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n"
    @del_cbs << cb if cb
  }
end

#get(*keys) ⇒ Object

Get the value associated with one or multiple keys

cache.get(:a){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
# File 'lib/em/protocols/memcache.rb', line 56

def get *keys
  raise ArgumentError unless block_given?

  callback{
    keys = keys.map{|k| k.to_s.gsub(/\s/,'_') }
    send_data "get #{keys.join(' ')}\r\n"
    @get_cbs << [keys, proc{ |values|
      yield *keys.map{ |k| values[k] }
    }]
  }
end

#get_hash(*keys) ⇒ Object

Gets multiple values as a hash

cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }

Raises:

  • (ArgumentError)


87
88
89
90
91
92
93
# File 'lib/em/protocols/memcache.rb', line 87

def get_hash *keys
  raise ArgumentError unless block_given?

  get *keys do |*values|
    yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] }
  end
end

#initialize(host, port = 11211) ⇒ Object

em hooks



129
130
131
# File 'lib/em/protocols/memcache.rb', line 129

def initialize host, port = 11211
  @host, @port = host, port
end

#process_cmd(line) ⇒ Object

– def receive_line line



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/em/protocols/memcache.rb', line 167

def process_cmd line
  case line.strip
  when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes>
    bytes = Integer($3)
    # set_binary_mode bytes+2
    # @cur_key = $1
    if @buffer.size >= bytes + 2
      @values[$1] = @buffer.slice!(0,bytes)
      @buffer.slice!(0,2) # \r\n
    else
      raise ParserError
    end

  when Cend # END
    if entry = @get_cbs.shift
      keys, cb = entry
      cb.call(@values)
    end
    @values = {}

  when Cstored # STORED
    if cb = @set_cbs.shift
      cb.call(true)
    end

  when Cdeleted # DELETED
    if cb = @del_cbs.shift
      cb.call(true)
    end

  when Cunknown # NOT_FOUND
    if cb = @del_cbs.shift
      cb.call(false)
    end

  else
    p [:MEMCACHE_UNKNOWN, line]
  end
end

#receive_data(data) ⇒ Object

– 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause

stack overflows when there is too much data.

include EM::P::LineText2



151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/em/protocols/memcache.rb', line 151

def receive_data data
  (@buffer||='') << data

  while index = @buffer.index(Cdelimiter)
    begin
      line = @buffer.slice!(0,index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end

#set(key, val, exptime = 0, &cb) ⇒ Object

Set the value for a given key

cache.set :a, 'hello'
cache.set(:missing, 'abc'){ puts "stored the value!" }


73
74
75
76
77
78
79
80
81
# File 'lib/em/protocols/memcache.rb', line 73

def set key, val, exptime = 0, &cb
  callback{
    val = val.to_s
    send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given?
    send_data val
    send_data Cdelimiter
    @set_cbs << cb if cb
  }
end

#unbindObject

– def receive_binary_data data

@values[@cur_key] = data[0..-3]

end



212
213
214
215
216
217
218
219
220
221
# File 'lib/em/protocols/memcache.rb', line 212

def unbind
  if @connected or @reconnecting
    EM.add_timer(1){ reconnect @host, @port }
    @connected = false
    @reconnecting = true
    @deferred_status = nil
  else
    raise 'Unable to connect to memcached server'
  end
end