Module: EventMachine::Protocols::Redis

Includes:
EM::Deferrable
Defined in:
lib/em-redis/redis_protocol.rb

Defined Under Namespace

Classes: ConnectionError, DisabledCommand, ParserError, ProtocolError, RedisError

Constant Summary collapse

OK =

constants

"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
DELIM =
"\r\n".freeze
BOOLEAN_PROCESSOR =
lambda{|r| %w(1 OK).include? r.to_s}
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "select"    => BOOLEAN_PROCESSOR,
  "hexists"   => BOOLEAN_PROCESSOR,
  "hset"      => BOOLEAN_PROCESSOR,
  "hdel"      => BOOLEAN_PROCESSOR,
  "hsetnx"    => BOOLEAN_PROCESSOR,
  "hgetall"   => lambda{|r| Hash[*r]},
  "info"      => lambda{|r|
    info = {}
    r.each_line do |line|
      line.chomp!
      unless line.empty?
        k, v = line.split(":", 2)
        info[k.to_sym] = v
      end
    end
    info
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => "zcard",
  "zset_range_by_score"  => "zrangebyscore",
  "zset_reverse_range"   => "zrevrange",
  "zset_range"           => "zrange",
  "zset_delete"          => "zrem",
  "zset_score"           => "zscore",
  "zset_incr_by"         => "zincrby",
  "zset_increment_by"    => "zincrby",
  "background_save"      => 'bgsave',
  "async_save"           => 'bgsave',
  "members"              => 'smembers',
  "decrement_by"         => "decrby",
  "decrement"            => "decr",
  "increment_by"         => "incrby",
  "increment"            => "incr",
  "set_if_nil"           => "setnx",
  "multi_get"            => "mget",
  "random_key"           => "randomkey",
  "random"               => "randomkey",
  "rename_if_nil"        => "renamenx",
  "tail_pop"             => "rpop",
  "pop"                  => "rpop",
  "head_pop"             => "lpop",
  "shift"                => "lpop",
  "list_remove"          => "lrem",
  "index"                => "lindex",
  "trim"                 => "ltrim",
  "list_range"           => "lrange",
  "range"                => "lrange",
  "list_len"             => "llen",
  "len"                  => "llen",
  "head_push"            => "lpush",
  "unshift"              => "lpush",
  "tail_push"            => "rpush",
  "push"                 => "rpush",
  "add"                  => "sadd",
  "set_remove"           => "srem",
  "set_size"             => "scard",
  "member?"              => "sismember",
  "intersect"            => "sinter",
  "intersect_and_store"  => "sinterstore",
  "members"              => "smembers",
  "exists?"              => "exists"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}

Class Method Summary collapse

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv, &blk) ⇒ Object



223
224
225
# File 'lib/em-redis/redis_protocol.rb', line 223

def method_missing(*argv, &blk)
  call_command(argv, &blk)
end

Class Method Details

.connect(*args) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/em-redis/redis_protocol.rb', line 317

def connect(*args)
  case args.length
  when 0
    options = {}
  when 1
    arg = args.shift
    case arg
    when Hash then options = arg
    when String then options = parse_url(arg)
    else error ArgumentError, 'first argument must be Hash or String'
    end
  when 2
    options = {:host => args[1], :port => args[2]}
  else
    error ArgumentError, "wrong number of arguments (#{args.length} for 1)"
  end
  options[:host] ||= '127.0.0.1'
  options[:port]   = (options[:port] || 6379).to_i
  EM.connect options[:host], options[:port], self, options
end

.parse_url(url) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/em-redis/redis_protocol.rb', line 304

def parse_url(url)
  begin
    uri = URI.parse(url)
    {
      :host => uri.host,
      :port => uri.port,
      :password => uri.password
    }
  rescue
    error ArgumentError, 'invalid redis url'
  end
end

Instance Method Details

#[]=(key, value) ⇒ Object



140
141
142
# File 'lib/em-redis/redis_protocol.rb', line 140

def []=(key,value)
  set(key,value)
end

#after_reconnect(&blk) ⇒ Object



219
220
221
# File 'lib/em-redis/redis_protocol.rb', line 219

def after_reconnect(&blk)
  @reconnect_callbacks[:after] = blk
end

#auth(password, &blk) ⇒ Object



176
177
178
179
# File 'lib/em-redis/redis_protocol.rb', line 176

def auth(password, &blk)
  @password = password
  call_command(['auth', password], &blk)
end

#before_reconnect(&blk) ⇒ Object



215
216
217
# File 'lib/em-redis/redis_protocol.rb', line 215

def before_reconnect(&blk)
  @reconnect_callbacks[:before] = blk
end

#call_command(argv, &blk) ⇒ Object



227
228
229
# File 'lib/em-redis/redis_protocol.rb', line 227

def call_command(argv, &blk)
  callback { raw_call_command(argv, &blk) }
end

#call_commands(argvs, &blk) ⇒ Object



238
239
240
# File 'lib/em-redis/redis_protocol.rb', line 238

def call_commands(argvs, &blk)
  callback { raw_call_commands(argvs, &blk) }
end

#closeObject



481
482
483
484
# File 'lib/em-redis/redis_protocol.rb', line 481

def close
  @closing = true
  close_connection_after_writing
end

#connected?Boolean

Returns:

  • (Boolean)


477
478
479
# File 'lib/em-redis/redis_protocol.rb', line 477

def connected?
  @connected || false
end

#connection_completedObject



367
368
369
370
371
372
373
374
375
376
# File 'lib/em-redis/redis_protocol.rb', line 367

def connection_completed
  @logger.debug { "Connected to #{@host}:#{@port}" } if @logger
  @reconnect_callbacks[:after].call if @reconnecting
  @redis_callbacks = []
  @multibulk_n     = false
  @reconnecting    = false
  @connected       = true
  auth_and_select_db
  succeed
end

#decr(key, decrement = nil, &blk) ⇒ Object



167
168
169
# File 'lib/em-redis/redis_protocol.rb', line 167

def decr(key, decrement = nil, &blk)
  call_command(decrement ? ["decrby",key,decrement] : ["decr",key], &blk)
end

#dispatch_error(code) ⇒ Object



436
437
438
439
# File 'lib/em-redis/redis_protocol.rb', line 436

def dispatch_error(code)
  @redis_callbacks.shift
  error RedisError, code
end

#dispatch_response(value) ⇒ Object



441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/em-redis/redis_protocol.rb', line 441

def dispatch_response(value)
  if @multibulk_n
    @multibulk_values << value
    @multibulk_n -= 1

    if @multibulk_n == 0
      value = @multibulk_values
      @multibulk_n = false
    else
      return
    end
  end

  callback = @redis_callbacks.shift
  if callback.kind_of?(Array) && callback.length == 2
    processor, blk = callback
    value = processor.call(value) if processor
    blk.call(value) if blk
  elsif callback.kind_of?(Array) && callback.length == 3
    processor, pipeline_count, blk = callback
    value = processor.call(value) if processor
    @values << value
    if pipeline_count > 1
      @redis_callbacks.unshift [processor, pipeline_count - 1, blk]
    else
      blk.call(@values) if blk
      @values = []
    end
  end
end

#errback(&blk) ⇒ Object Also known as: on_error



204
205
206
# File 'lib/em-redis/redis_protocol.rb', line 204

def errback(&blk)
  @error_callback = blk
end

#error(klass, msg) ⇒ Object



209
210
211
212
213
# File 'lib/em-redis/redis_protocol.rb', line 209

def error(klass, msg)
  err = klass.new(msg)
  err.code = msg if err.respond_to?(:code)
  @error_callback.call(err)
end

#incr(key, increment = nil, &blk) ⇒ Object



163
164
165
# File 'lib/em-redis/redis_protocol.rb', line 163

def incr(key, increment = nil, &blk)
  call_command(increment ? ["incrby",key,increment] : ["incr",key], &blk)
end

#initialize(options = {}) ⇒ Object



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/em-redis/redis_protocol.rb', line 339

def initialize(options = {})
  @host               = options[:host]
  @port               = options[:port]
  @db                 = (options[:db] || 0).to_i
  @password           = options[:password]
  @auto_reconnect     = options.fetch(:auto_reconnect, true)
  @reconnect_on_error = options.fetch(:reconnect_on_error, false)
  @logger             = options[:logger]
  @error_callback = lambda do |err|
    raise err
  end
  @reconnect_callbacks = {
    :before => lambda{},
    :after  => lambda{}
  }
  @values = []
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



183
184
185
186
187
188
189
190
191
192
# File 'lib/em-redis/redis_protocol.rb', line 183

def mapped_mget(*keys)
  mget(*keys) do |response|
    result = {}
    response.each do |value|
      key = keys.shift
      result.merge!(key => value) unless value.nil?
    end
    yield result if block_given?
  end
end

#process_cmd(line) ⇒ Object



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
# File 'lib/em-redis/redis_protocol.rb', line 394

def process_cmd(line)
  @logger.debug { "*** processing #{line}" } if @logger
  # first character of buffer will always be the response type
  reply_type = line[0, 1]
  reply_args = line.slice(1..-3) # remove type character and \r\n
  case reply_type
  # e.g. -ERR
  when MINUS
    # server ERROR
    dispatch_error(reply_args)
  # e.g. +OK
  when PLUS
    dispatch_response(reply_args)
  # e.g. $3\r\nabc\r\n
  # 'bulk' is more complex because it could be part of multi-bulk
  when DOLLAR
    data_len = Integer(reply_args)
    if data_len == -1 # expect no data; return nil
      dispatch_response(nil)
    elsif @buffer.size >= data_len + 2 # buffer is full of expected data
      dispatch_response(@buffer.slice!(0, data_len))
      @buffer.slice!(0,2) # tossing \r\n
    else # buffer isn't full or nil
      raise ParserError
    end
  # e.g. :8
  when COLON
    dispatch_response(Integer(reply_args))
  # e.g. *2\r\n$1\r\na\r\n$1\r\nb\r\n
  when ASTERISK
    multibulk_count = Integer(reply_args)
    if multibulk_count == -1 || multibulk_count == 0
      dispatch_response([])
    else
      start_multibulk(multibulk_count)
    end
  # WAT?
  else
    error ProtocolError, "reply type not recognized: #{line.strip}"
  end
end

#quit(&blk) ⇒ Object



200
201
202
# File 'lib/em-redis/redis_protocol.rb', line 200

def quit(&blk)
  call_command(['quit'], &blk)
end

#raw_call_command(argv, &blk) ⇒ Object



231
232
233
234
235
236
# File 'lib/em-redis/redis_protocol.rb', line 231

def raw_call_command(argv, &blk)
  argv[0] = argv[0].to_s unless argv[0].kind_of? String
  argv[0] = argv[0].downcase
  send_command(argv)
  @redis_callbacks << [REPLY_PROCESSOR[argv[0]], blk]
end

#raw_call_commands(argvs, &blk) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/em-redis/redis_protocol.rb', line 242

def raw_call_commands(argvs, &blk)
  if argvs.empty?  # Shortcut
    blk.call []
    return
  end

  argvs.each do |argv|
    argv[0] = argv[0].to_s unless argv[0].kind_of? String
    send_command argv
  end
  # FIXME: argvs may contain heterogenous commands, storing all
  # REPLY_PROCESSORs may turn out expensive and has been omitted
  # for now.
  @redis_callbacks << [nil, argvs.length, blk]
end

#receive_data(data) ⇒ Object

19Feb09 Switched to a custom parser, LineText2 is recursive and can cause

stack overflows when there is too much data.

include EM::P::LineText2



381
382
383
384
385
386
387
388
389
390
391
392
# File 'lib/em-redis/redis_protocol.rb', line 381

def receive_data(data)
  (@buffer ||= '') << data
  while index = @buffer.index(DELIM)
    begin
      line = @buffer.slice!(0, index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end

#select(db, &blk) ⇒ Object



171
172
173
174
# File 'lib/em-redis/redis_protocol.rb', line 171

def select(db, &blk)
  @db = db.to_i
  call_command(['select', @db], &blk)
end

#send_command(argv) ⇒ Object



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/em-redis/redis_protocol.rb', line 258

def send_command(argv)
  argv = argv.dup

  error DisabledCommand, "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
  argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]]

  if argv[-1].is_a?(Hash)
    argv[-1] = argv[-1].to_a
    argv.flatten!
  end

  command = ["*#{argv.size}"]
  argv.each do |v|
    v = v.to_s
    command << "$#{get_size(v)}"
    command << v
  end
  command = command.map {|cmd| cmd + DELIM}.join

  @logger.debug { "*** sending: #{command}" } if @logger
  send_data command
end

#set(key, value, expiry = nil) ⇒ Object



144
145
146
147
148
149
# File 'lib/em-redis/redis_protocol.rb', line 144

def set(key, value, expiry=nil)
  call_command(["set", key, value]) do |s|
    yield s if block_given?
  end
  expire(key, expiry) if expiry
end

#sort(key, options = {}, &blk) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/em-redis/redis_protocol.rb', line 151

def sort(key, options={}, &blk)
  cmd = ["sort", key]
  cmd << ["by", options[:by]] if options[:by]
  Array(options[:get]).each do |v|
    cmd << ["get", v]
  end
  cmd << options[:order].split(" ") if options[:order]
  cmd << ["limit", options[:limit]] if options[:limit]
  cmd << ["store", options[:store]] if options[:store]
  call_command(cmd.flatten, &blk)
end

#start_multibulk(multibulk_count) ⇒ Object



472
473
474
475
# File 'lib/em-redis/redis_protocol.rb', line 472

def start_multibulk(multibulk_count)
  @multibulk_n = multibulk_count
  @multibulk_values = []
end

#type(key, &blk) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



196
197
198
# File 'lib/em-redis/redis_protocol.rb', line 196

def type(key, &blk)
  call_command(['type', key], &blk)
end

#unbindObject



486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# File 'lib/em-redis/redis_protocol.rb', line 486

def unbind
  @logger.debug { "Disconnected" } if @logger
  if @closing
    @reconnecting = false
  elsif ((@connected || @reconnecting) && @auto_reconnect) || @reconnect_on_error
    @reconnect_callbacks[:before].call unless @reconnecting
    @reconnecting = true
    EM.add_timer(1) do
      @logger.debug { "Reconnecting to #{@host}:#{@port}" } if @logger
      reconnect @host, @port
    end
  elsif @connected
    error ConnectionError, 'connection closed'
  else
    error ConnectionError, 'unable to connect to redis server'
  end
  @connected = false
  @deferred_status = nil
end