Module: EventMachine::Protocols::Redis
- Includes:
- EM::Deferrable
- Defined in:
- lib/em-redis/redis_protocol.rb
Defined Under Namespace
Classes: ParserError, ProtocolError, RedisError
Constant Summary collapse
- OK =
constants
"OK".freeze
- MINUS =
"-".freeze
- PLUS =
"+".freeze
- COLON =
":".freeze
- DOLLAR =
"$".freeze
- ASTERISK =
"*".freeze
- DELIM =
"\r\n".freeze
- BULK_COMMANDS =
{ "set" => true, "setnx" => true, "rpush" => true, "lpush" => true, "lset" => true, "lrem" => true, "sadd" => true, "srem" => true, "sismember" => true, "rpoplpush" => true, "echo" => true, "getset" => true, "smove" => true, "zadd" => true, "zrem" => true, "zscore" => true }
- MULTI_BULK_COMMANDS =
{ "mset" => true, "msetnx" => true, # these aliases aren't in redis gem "multi_get" => true }
- BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
- REPLY_PROCESSOR =
{ "exists" => BOOLEAN_PROCESSOR, "sismember" => BOOLEAN_PROCESSOR, "sadd" => BOOLEAN_PROCESSOR, "srem" => BOOLEAN_PROCESSOR, "smove" => BOOLEAN_PROCESSOR, "zadd" => BOOLEAN_PROCESSOR, "zrem" => BOOLEAN_PROCESSOR, "move" => BOOLEAN_PROCESSOR, "setnx" => BOOLEAN_PROCESSOR, "del" => BOOLEAN_PROCESSOR, "renamenx" => BOOLEAN_PROCESSOR, "expire" => BOOLEAN_PROCESSOR, "select" => BOOLEAN_PROCESSOR, # not in redis gem "keys" => lambda{|r| r.split(" ")}, "info" => lambda{|r| info = {} r.each_line {|kv| k,v = kv.split(":",2).map{|x| x.chomp} info[k.to_sym] = v } info } }
- ALIASES =
{ "flush_db" => "flushdb", "flush_all" => "flushall", "last_save" => "lastsave", "key?" => "exists", "delete" => "del", "randkey" => "randomkey", "list_length" => "llen", "push_tail" => "rpush", "push_head" => "lpush", "pop_tail" => "rpop", "pop_head" => "lpop", "list_set" => "lset", "list_range" => "lrange", "list_trim" => "ltrim", "list_index" => "lindex", "list_rm" => "lrem", "set_add" => "sadd", "set_delete" => "srem", "set_count" => "scard", "set_member?" => "sismember", "set_members" => "smembers", "set_intersect" => "sinter", "set_intersect_store" => "sinterstore", "set_inter_store" => "sinterstore", "set_union" => "sunion", "set_union_store" => "sunionstore", "set_diff" => "sdiff", "set_diff_store" => "sdiffstore", "set_move" => "smove", "set_unless_exists" => "setnx", "rename_unless_exists" => "renamenx", "type?" => "type", "zset_add" => "zadd", "zset_count" => 'zcard', "zset_range_by_score" => 'zrangebyscore', "zset_reverse_range" => 'zrevrange', "zset_range" => 'zrange', "zset_delete" => 'zrem', "zset_score" => 'zscore', # these aliases aren't in redis gem "background_save" => 'bgsave', "async_save" => 'bgsave', "members" => 'smembers', "decrement_by" => "decrby", "decrement" => "decr", "increment_by" => "incrby", "increment" => "incr", "set_if_nil" => "setnx", "multi_get" => "mget", "random_key" => "randomkey", "random" => "randomkey", "rename_if_nil" => "renamenx", "tail_pop" => "rpop", "pop" => "rpop", "head_pop" => "lpop", "shift" => "lpop", "list_remove" => "lrem", "index" => "lindex", "trim" => "ltrim", "list_range" => "lrange", "range" => "lrange", "list_len" => "llen", "len" => "llen", "head_push" => "lpush", "unshift" => "lpush", "tail_push" => "rpush", "push" => "rpush", "add" => "sadd", "set_remove" => "srem", "set_size" => "scard", "member?" => "sismember", "intersect" => "sinter", "intersect_and_store" => "sinterstore", "members" => "smembers", "exists?" => "exists" }
- DISABLED_COMMANDS =
{ "monitor" => true, "sync" => true }
Class Method Summary collapse
Instance Method Summary collapse
- #[](key) ⇒ Object
- #[]=(key, value) ⇒ Object
- #call_command(argv, &blk) ⇒ Object
- #connection_completed ⇒ Object
- #decr(key, decrement = nil, &blk) ⇒ Object
- #dispatch_response(value) ⇒ Object
- #incr(key, increment = nil, &blk) ⇒ Object
- #initialize(host, port = 6379) ⇒ Object
-
#mapped_mget(*keys) ⇒ Object
Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.
- #method_missing(*argv, &blk) ⇒ Object
- #on_error(&blk) ⇒ Object
- #process_cmd(line) ⇒ Object
- #quit(&blk) ⇒ Object
- #raw_call_command(argv, &blk) ⇒ Object
-
#receive_data(data) ⇒ Object
19Feb09 Switched to a custom parser, LineText2 is recursive and can cause stack overflows when there is too much data.
- #set(key, value, expiry = nil) ⇒ Object
- #sort(key, options = {}, &blk) ⇒ Object
-
#type(key, &blk) ⇒ Object
Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing.
- #unbind ⇒ Object
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(*argv, &blk) ⇒ Object
217 218 219 |
# File 'lib/em-redis/redis_protocol.rb', line 217 def method_missing(*argv, &blk) call_command(argv, &blk) end |
Class Method Details
.connect(host = 'localhost', port = 6379) ⇒ Object
em hooks
275 276 277 278 |
# File 'lib/em-redis/redis_protocol.rb', line 275 def self.connect(host = 'localhost', port = 6379 ) puts "*** connecting" if $debug EM.connect host, port, self, host, port end |
Instance Method Details
#[](key) ⇒ Object
157 158 159 |
# File 'lib/em-redis/redis_protocol.rb', line 157 def [](key) self.get(key) end |
#[]=(key, value) ⇒ Object
161 162 163 |
# File 'lib/em-redis/redis_protocol.rb', line 161 def []=(key,value) set(key,value) end |
#call_command(argv, &blk) ⇒ Object
221 222 223 |
# File 'lib/em-redis/redis_protocol.rb', line 221 def call_command(argv, &blk) callback { raw_call_command(argv, &blk) } end |
#connection_completed ⇒ Object
285 286 287 288 289 290 291 292 293 294 |
# File 'lib/em-redis/redis_protocol.rb', line 285 def connection_completed puts "*** connection_complete!" if $debug @redis_callbacks = [] @values = [] @multibulk_n = 0 @reconnecting = false @connected = true succeed end |
#decr(key, decrement = nil, &blk) ⇒ Object
186 187 188 |
# File 'lib/em-redis/redis_protocol.rb', line 186 def decr(key, decrement = nil, &blk) call_command(decrement ? ["decrby",key,decrement] : ["decr",key], &blk) end |
#dispatch_response(value) ⇒ Object
383 384 385 386 387 |
# File 'lib/em-redis/redis_protocol.rb', line 383 def dispatch_response(value) processor, blk = @redis_callbacks.shift value = processor.call(value) if processor blk.call(value) if blk end |
#incr(key, increment = nil, &blk) ⇒ Object
182 183 184 |
# File 'lib/em-redis/redis_protocol.rb', line 182 def incr(key, increment = nil, &blk) call_command(increment ? ["incrby",key,increment] : ["incr",key], &blk) end |
#initialize(host, port = 6379) ⇒ Object
280 281 282 283 |
# File 'lib/em-redis/redis_protocol.rb', line 280 def initialize(host, port = 6379 ) puts "*** initializing" if $debug @host, @port = host, port end |
#mapped_mget(*keys) ⇒ Object
Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.
192 193 194 195 196 197 198 199 200 201 |
# File 'lib/em-redis/redis_protocol.rb', line 192 def mapped_mget(*keys) mget(*keys) do |response| result = {} response.each do |value| key = keys.shift result.merge!(key => value) unless value.nil? end yield result if block_given? end end |
#on_error(&blk) ⇒ Object
213 214 215 |
# File 'lib/em-redis/redis_protocol.rb', line 213 def on_error(&blk) @err_cb = blk end |
#process_cmd(line) ⇒ Object
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/em-redis/redis_protocol.rb', line 312 def process_cmd(line) puts "*** processing #{line}" if $debug # first character of buffer will always be the response type reply_type = line[0, 1] reply_args = line.slice(1..-3) # remove type character and \r\n case reply_type #e.g. -MISSING when MINUS @redis_callbacks.shift # throw away the cb? if @err_cb @err_cb.call(reply_args) else err = RedisError.new err.code = reply_args raise err, "Redis server returned error code: #{err.code}" end # e.g. +OK when PLUS dispatch_response(reply_args) # e.g. $3\r\nabc\r\n # 'bulk' is more complex because it could be part of multi-bulk when DOLLAR data_len = Integer(reply_args) if data_len == -1 # expect no data; return nil if @multibulk_n > 0 # we're in the middle of a multibulk reply @values << nil if @values.size == @multibulk_n # DING, we're done dispatch_response(@values) @values = [] @multibulk_n = 0 end else dispatch_response(nil) end elsif @buffer.size >= data_len + 2 # buffer is full of expected data if @multibulk_n > 0 # we're in the middle of a multibulk reply @values << @buffer.slice!(0, data_len) if @values.size == @multibulk_n # DING, we're done dispatch_response(@values) @values = [] @multibulk_n = 0 end else # not multibulk value = @buffer.slice!(0, data_len) dispatch_response(value) end @buffer.slice!(0,2) # tossing \r\n else # buffer isn't full or nil # FYI, ParseError puts command back on head of buffer, waits for # more data complete buffer raise ParserError end #e.g. :8 when COLON dispatch_response(Integer(reply_args)) #e.g. *2\r\n$1\r\na\r\n$1\r\nb\r\n when ASTERISK @multibulk_n = Integer(reply_args) dispatch_response(nil) if @multibulk_n == -1 # Whu? else raise ProtocolError, "reply type not recognized: #{line.strip}" end end |
#quit(&blk) ⇒ Object
209 210 211 |
# File 'lib/em-redis/redis_protocol.rb', line 209 def quit(&blk) call_command(['quit'], &blk) end |
#raw_call_command(argv, &blk) ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/em-redis/redis_protocol.rb', line 225 def raw_call_command(argv, &blk) argv = argv.dup if MULTI_BULK_COMMANDS[argv.flatten[0].to_s] # TODO improve this code argvp = argv.flatten values = argvp.pop.to_a.flatten argvp = values.unshift(argvp[0]) command = ["*#{argvp.size}"] argvp.each do |v| v = v.to_s command << "$#{get_size(v)}" command << v end command = command.map {|cmd| "#{cmd}\r\n"}.join else command = "" bulk = nil argv[0] = argv[0].to_s.downcase argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]] raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]] if BULK_COMMANDS[argv[0]] and argv.length > 1 bulk = argv[-1].to_s argv[-1] = get_size(bulk) end command << "#{argv.join(' ')}\r\n" command << "#{bulk}\r\n" if bulk end puts "*** sending: #{command}" if $debug @redis_callbacks << [REPLY_PROCESSOR[argv[0]], blk] send_data command end |
#receive_data(data) ⇒ Object
19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
stack overflows when there is too much data.
include EM::P::LineText2
299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/em-redis/redis_protocol.rb', line 299 def receive_data(data) (@buffer||='') << data while index = @buffer.index(DELIM) begin line = @buffer.slice!(0, index+2) process_cmd line rescue ParserError @buffer[0...0] = line break end end end |
#set(key, value, expiry = nil) ⇒ Object
165 166 167 168 169 170 |
# File 'lib/em-redis/redis_protocol.rb', line 165 def set(key, value, expiry=nil) call_command([:set, key, value]) do |s| expire(key, expiry) if s == OK && expiry yield s if block_given? end end |
#sort(key, options = {}, &blk) ⇒ Object
172 173 174 175 176 177 178 179 180 |
# File 'lib/em-redis/redis_protocol.rb', line 172 def sort(key, ={}, &blk) cmd = ["SORT"] cmd << key cmd << "BY #{[:by]}" if [:by] cmd << "GET #{[[:get]].flatten * ' GET '}" if [:get] cmd << "#{[:order]}" if [:order] cmd << "LIMIT #{[:limit].join(' ')}" if [:limit] call_command(cmd, &blk) end |
#type(key, &blk) ⇒ Object
Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing
205 206 207 |
# File 'lib/em-redis/redis_protocol.rb', line 205 def type(key, &blk) call_command(['type', key], &blk) end |
#unbind ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/em-redis/redis_protocol.rb', line 389 def unbind puts "*** unbinding" if $debug if @connected or @reconnecting EM.add_timer(1){ reconnect @host, @port } @connected = false @reconnecting = true @deferred_status = nil else raise 'Unable to connect to redis server' end end |