Module: EventMachine::Protocols::Redis

Includes:
EM::Deferrable
Defined in:
lib/em-redis/redis_protocol.rb

Defined Under Namespace

Classes: ParserError, ProtocolError, RedisError

Constant Summary collapse

OK =

constants

"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
DELIM =
"\r\n".freeze
BULK_COMMANDS =
{
  "set"       => true,
  "setnx"     => true,
  "rpush"     => true,
  "lpush"     => true,
  "lset"      => true,
  "lrem"      => true,
  "sadd"      => true,
  "srem"      => true,
  "sismember" => true,
  "rpoplpush" => true,
  "echo"      => true,
  "getset"    => true,
  "smove"     => true,
  "zadd"      => true,
  "zrem"      => true,
  "zscore"    => true
}
MULTI_BULK_COMMANDS =
{
  "mset"      => true,
  "msetnx"    => true,
  # these aliases aren't in redis gem
  "multi_get" => true
}
BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "select"    => BOOLEAN_PROCESSOR, # not in redis gem
  "keys"      => lambda{|r| r.split(" ")},
  "info"      => lambda{|r|
    info = {}
    r.each_line {|kv|
      k,v = kv.split(":",2).map{|x| x.chomp}
      info[k.to_sym] = v
    }
    info
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => 'zcard',
  "zset_range_by_score"  => 'zrangebyscore',
  "zset_reverse_range"   => 'zrevrange',
  "zset_range"           => 'zrange',
  "zset_delete"          => 'zrem',
  "zset_score"           => 'zscore',
  # these aliases aren't in redis gem
  "background_save"      => 'bgsave',
  "async_save"           => 'bgsave',
  "members"              => 'smembers',
  "decrement_by"         => "decrby",
  "decrement"            => "decr",
  "increment_by"         => "incrby",
  "increment"            => "incr",
  "set_if_nil"           => "setnx",
  "multi_get"            => "mget",
  "random_key"           => "randomkey",
  "random"               => "randomkey",
  "rename_if_nil"        => "renamenx",
  "tail_pop"             => "rpop",
  "pop"                  => "rpop",
  "head_pop"             => "lpop",
  "shift"                => "lpop",
  "list_remove"          => "lrem",
  "index"                => "lindex",
  "trim"                 => "ltrim",
  "list_range"           => "lrange",
  "range"                => "lrange",
  "list_len"             => "llen",
  "len"                  => "llen",
  "head_push"            => "lpush",
  "unshift"              => "lpush",
  "tail_push"            => "rpush",
  "push"                 => "rpush",
  "add"                  => "sadd",
  "set_remove"           => "srem",
  "set_size"             => "scard",
  "member?"              => "sismember",
  "intersect"            => "sinter",
  "intersect_and_store"  => "sinterstore",
  "members"              => "smembers",
  "exists?"              => "exists"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}

Class Method Summary collapse

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv, &blk) ⇒ Object



217
218
219
# File 'lib/em-redis/redis_protocol.rb', line 217

def method_missing(*argv, &blk)
  call_command(argv, &blk)
end

Class Method Details

.connect(host = 'localhost', port = 6379) ⇒ Object

em hooks



275
276
277
278
# File 'lib/em-redis/redis_protocol.rb', line 275

def self.connect(host = 'localhost', port = 6379 )
  puts "*** connecting" if $debug
  EM.connect host, port, self, host, port
end

Instance Method Details

#[](key) ⇒ Object



157
158
159
# File 'lib/em-redis/redis_protocol.rb', line 157

def [](key)
  self.get(key)
end

#[]=(key, value) ⇒ Object



161
162
163
# File 'lib/em-redis/redis_protocol.rb', line 161

def []=(key,value)
  set(key,value)
end

#call_command(argv, &blk) ⇒ Object



221
222
223
# File 'lib/em-redis/redis_protocol.rb', line 221

def call_command(argv, &blk)
  callback { raw_call_command(argv, &blk) }
end

#connection_completedObject



285
286
287
288
289
290
291
292
293
294
# File 'lib/em-redis/redis_protocol.rb', line 285

def connection_completed
  puts "*** connection_complete!" if $debug
  @redis_callbacks = []
  @values = []
  @multibulk_n = 0

  @reconnecting = false
  @connected = true
  succeed
end

#decr(key, decrement = nil, &blk) ⇒ Object



186
187
188
# File 'lib/em-redis/redis_protocol.rb', line 186

def decr(key, decrement = nil, &blk)
  call_command(decrement ? ["decrby",key,decrement] : ["decr",key], &blk)
end

#dispatch_response(value) ⇒ Object



383
384
385
386
387
# File 'lib/em-redis/redis_protocol.rb', line 383

def dispatch_response(value)
  processor, blk = @redis_callbacks.shift
  value = processor.call(value) if processor
  blk.call(value) if blk
end

#incr(key, increment = nil, &blk) ⇒ Object



182
183
184
# File 'lib/em-redis/redis_protocol.rb', line 182

def incr(key, increment = nil, &blk)
  call_command(increment ? ["incrby",key,increment] : ["incr",key], &blk)
end

#initialize(host, port = 6379) ⇒ Object



280
281
282
283
# File 'lib/em-redis/redis_protocol.rb', line 280

def initialize(host, port = 6379 )
  puts "*** initializing" if $debug
  @host, @port = host, port
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



192
193
194
195
196
197
198
199
200
201
# File 'lib/em-redis/redis_protocol.rb', line 192

def mapped_mget(*keys)
  mget(*keys) do |response|
    result = {}
    response.each do |value|
      key = keys.shift
      result.merge!(key => value) unless value.nil?
    end
    yield result if block_given?
  end
end

#on_error(&blk) ⇒ Object



213
214
215
# File 'lib/em-redis/redis_protocol.rb', line 213

def on_error(&blk)
  @err_cb = blk
end

#process_cmd(line) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/em-redis/redis_protocol.rb', line 312

def process_cmd(line)
  puts "*** processing #{line}" if $debug
  # first character of buffer will always be the response type
  reply_type = line[0, 1]
  reply_args = line.slice(1..-3) # remove type character and \r\n
  case reply_type

  #e.g. -MISSING
  when MINUS
    @redis_callbacks.shift # throw away the cb?
    if @err_cb
      @err_cb.call(reply_args)
    else
      err = RedisError.new
      err.code = reply_args
      raise err, "Redis server returned error code: #{err.code}"
    end

  # e.g. +OK
  when PLUS
    dispatch_response(reply_args)

  # e.g. $3\r\nabc\r\n
  # 'bulk' is more complex because it could be part of multi-bulk
  when DOLLAR
    data_len = Integer(reply_args)
    if data_len == -1 # expect no data; return nil
      if @multibulk_n > 0 # we're in the middle of a multibulk reply
        @values << nil
        if @values.size == @multibulk_n # DING, we're done
          dispatch_response(@values)
          @values = []
          @multibulk_n = 0
        end
      else
        dispatch_response(nil)
      end
    elsif @buffer.size >= data_len + 2 # buffer is full of expected data
      if @multibulk_n > 0 # we're in the middle of a multibulk reply
        @values << @buffer.slice!(0, data_len)
        if @values.size == @multibulk_n # DING, we're done
          dispatch_response(@values)
          @values = []
          @multibulk_n = 0
        end
      else # not multibulk
        value = @buffer.slice!(0, data_len)
        dispatch_response(value)
      end
      @buffer.slice!(0,2) # tossing \r\n
    else # buffer isn't full or nil
      # FYI, ParseError puts command back on head of buffer, waits for
      # more data complete buffer
      raise ParserError 
    end

  #e.g. :8
  when COLON
    dispatch_response(Integer(reply_args))

  #e.g. *2\r\n$1\r\na\r\n$1\r\nb\r\n 
  when ASTERISK
    @multibulk_n = Integer(reply_args)
    dispatch_response(nil) if @multibulk_n == -1

  # Whu?
  else
    raise ProtocolError, "reply type not recognized: #{line.strip}"
  end
end

#quit(&blk) ⇒ Object



209
210
211
# File 'lib/em-redis/redis_protocol.rb', line 209

def quit(&blk)
  call_command(['quit'], &blk)
end

#raw_call_command(argv, &blk) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/em-redis/redis_protocol.rb', line 225

def raw_call_command(argv, &blk)
  argv = argv.dup

  if MULTI_BULK_COMMANDS[argv.flatten[0].to_s]
    # TODO improve this code
    argvp   = argv.flatten
    values  = argvp.pop.to_a.flatten
    argvp   = values.unshift(argvp[0])
    command = ["*#{argvp.size}"]
    argvp.each do |v|
      v = v.to_s
      command << "$#{get_size(v)}"
      command << v
    end
    command = command.map {|cmd| "#{cmd}\r\n"}.join
  else
    command = ""
    bulk = nil
    argv[0] = argv[0].to_s.downcase
    argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]]
    raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
    if BULK_COMMANDS[argv[0]] and argv.length > 1
      bulk = argv[-1].to_s
      argv[-1] = get_size(bulk)
    end
    command << "#{argv.join(' ')}\r\n"
    command << "#{bulk}\r\n" if bulk
  end

  puts "*** sending: #{command}" if $debug
  @redis_callbacks << [REPLY_PROCESSOR[argv[0]], blk]
  send_data command
end

#receive_data(data) ⇒ Object

19Feb09 Switched to a custom parser, LineText2 is recursive and can cause

stack overflows when there is too much data.

include EM::P::LineText2



299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/em-redis/redis_protocol.rb', line 299

def receive_data(data)
  (@buffer||='') << data
  while index = @buffer.index(DELIM)
    begin
      line = @buffer.slice!(0, index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end

#set(key, value, expiry = nil) ⇒ Object



165
166
167
168
169
170
# File 'lib/em-redis/redis_protocol.rb', line 165

def set(key, value, expiry=nil)
  call_command([:set, key, value]) do |s|
    expire(key, expiry) if s == OK && expiry
    yield s if block_given?
  end
end

#sort(key, options = {}, &blk) ⇒ Object



172
173
174
175
176
177
178
179
180
# File 'lib/em-redis/redis_protocol.rb', line 172

def sort(key, options={}, &blk)
  cmd = ["SORT"]
  cmd << key
  cmd << "BY #{options[:by]}" if options[:by]
  cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
  cmd << "#{options[:order]}" if options[:order]
  cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
  call_command(cmd, &blk)
end

#type(key, &blk) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



205
206
207
# File 'lib/em-redis/redis_protocol.rb', line 205

def type(key, &blk)
  call_command(['type', key], &blk)
end

#unbindObject



389
390
391
392
393
394
395
396
397
398
399
# File 'lib/em-redis/redis_protocol.rb', line 389

def unbind
  puts "*** unbinding" if $debug
  if @connected or @reconnecting
    EM.add_timer(1){ reconnect @host, @port }
    @connected = false
    @reconnecting = true
    @deferred_status = nil
  else
    raise 'Unable to connect to redis server'
  end
end