Class: Redis::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/client.rb

Direct Known Subclasses

Pipeline

Defined Under Namespace

Classes: ProtocolError

Constant Summary collapse

OK =
"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
BULK_COMMANDS =
{
  "set"       => true,
  "setnx"     => true,
  "rpush"     => true,
  "lpush"     => true,
  "lset"      => true,
  "lrem"      => true,
  "sadd"      => true,
  "srem"      => true,
  "sismember" => true,
  "echo"      => true,
  "getset"    => true,
  "smove"     => true,
  "zadd"      => true,
  "zincrby"   => true,
  "zrem"      => true,
  "zscore"    => true,
  "zrank"     => true,
  "zrevrank"  => true,
  "hget"      => true,
  "hdel"      => true,
  "hexists"   => true,
  "publish"   => true
}
MULTI_BULK_COMMANDS =
{
  "mset"      => true,
  "msetnx"    => true,
  "hset"      => true,
  "hmset"     => true
}
BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "hset"      => BOOLEAN_PROCESSOR,
  "hexists"   => BOOLEAN_PROCESSOR,
  "info"      => lambda{|r|
    info = Hash.new do |hash, key|
      if hash.include?(key.to_s)
        Redis.deprecate "Redis#info will return a hash of string keys, not symbols", caller[2]
        hash[key.to_s]
      end
    end

    r.each_line {|kv|
      k,v = kv.split(":",2).map{|x| x.chomp}
      info[k] = v
    }
    info
  },
  "keys"      => lambda{|r|
    if r.is_a?(Array)
        r
    else
        r.split(" ")
    end
  },
  "hgetall"   => lambda{|r|
    Hash[*r]
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => "zcard",
  "zset_range_by_score"  => "zrangebyscore",
  "zset_reverse_range"   => "zrevrange",
  "zset_range"           => "zrange",
  "zset_delete"          => "zrem",
  "zset_score"           => "zscore",
  "zset_incr_by"         => "zincrby",
  "zset_increment_by"    => "zincrby"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}
BLOCKING_COMMANDS =
{
  "blpop" => true,
  "brpop" => true
}

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/redis/client.rb', line 145

def initialize(options = {})
  @host    =  options[:host]    || '127.0.0.1'
  @port    = (options[:port]    || 6379).to_i
  @db      = (options[:db]      || 0).to_i
  @timeout = (options[:timeout] || 5).to_i
  @password = options[:password]
  @logger  =  options[:logger]
  @thread_safe = options[:thread_safe]
  @binary_keys = options[:binary_keys]
  @mutex = Mutex.new if @thread_safe
  @sock = nil
  @pubsub = false

  log(self)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv) ⇒ Object



392
393
394
# File 'lib/redis/client.rb', line 392

def method_missing(*argv)
  call_command(argv)
end

Instance Method Details

#[](key) ⇒ Object



169
170
171
# File 'lib/redis/client.rb', line 169

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



173
174
175
# File 'lib/redis/client.rb', line 173

def []=(key,value)
  set(key, value)
end

#call_command(argv) ⇒ Object

Wrap raw_call_command to handle reconnection on socket error. We try to reconnect just one time, otherwise let the error araise.



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/redis/client.rb', line 338

def call_command(argv)
  log(argv.inspect, :debug)

  connect_to_server unless connected?

  begin
    raw_call_command(argv.dup)
  rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
    if reconnect
      raw_call_command(argv.dup)
    else
      raise Errno::ECONNRESET
    end
  end
end

#connect_to(host, port) ⇒ Object



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/redis/client.rb', line 358

def connect_to(host, port)

  # We support connect_to() timeout only if system_timer is availabe
  # or if we are running against Ruby >= 1.9
  # Timeout reading from the socket instead will be supported anyway.
  if @timeout != 0 and Timer
    begin
      @sock = TCPSocket.new(host, port)
    rescue Timeout::Error
      @sock = nil
      raise Timeout::Error, "Timeout connecting to the server"
    end
  else
    @sock = TCPSocket.new(host, port)
  end

  @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

  # If the timeout is set we set the low level socket options in order
  # to make sure a blocking read will return after the specified number
  # of seconds. This hack is from memcached ruby client.
  set_socket_timeout!(@timeout) if @timeout

rescue Errno::ECONNREFUSED
  raise Errno::ECONNREFUSED, "Unable to connect to Redis on #{host}:#{port}"
end

#connect_to_serverObject



385
386
387
388
389
390
# File 'lib/redis/client.rb', line 385

def connect_to_server
  connect_to(@host, @port)
  call_command([:auth, @password]) if @password
  call_command([:select, @db]) if @db != 0
  @sock
end

#decr(key, decrement = nil) ⇒ Object



254
255
256
257
258
259
260
261
# File 'lib/redis/client.rb', line 254

def decr(key, decrement = nil)
  if decrement
    deprecated("decr with a decrement", :decrby, caller[0])
    decrby(key, decrement)
  else
    call_command([:decr, key])
  end
end

#execObject



281
282
283
284
# File 'lib/redis/client.rb', line 281

def exec
  # Need to override Kernel#exec.
  call_command([:exec])
end

#get(key) ⇒ Object



177
178
179
# File 'lib/redis/client.rb', line 177

def get(key)
  call_command([:get, key])
end

#get_size(string) ⇒ Object



491
492
493
# File 'lib/redis/client.rb', line 491

def get_size(string)
  string.bytesize
end

#incr(key, increment = nil) ⇒ Object



245
246
247
248
249
250
251
252
# File 'lib/redis/client.rb', line 245

def incr(key, increment = nil)
  if increment
    deprecated("incr with an increment", :incrby, caller[0])
    incrby(key, increment)
  else
    call_command([:incr, key])
  end
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



225
226
227
228
229
230
231
232
# File 'lib/redis/client.rb', line 225

def mapped_mget(*keys)
  result = {}
  mget(*keys).each do |value|
    key = keys.shift
    result.merge!(key => value) unless value.nil?
  end
  result
end

#mapped_mset(hash) ⇒ Object



206
207
208
# File 'lib/redis/client.rb', line 206

def mapped_mset(hash)
  mset(*hash.to_a.flatten)
end

#mapped_msetnx(hash) ⇒ Object



219
220
221
# File 'lib/redis/client.rb', line 219

def mapped_msetnx(hash)
  msetnx(*hash.to_a.flatten)
end

#maybe_lock(&block) ⇒ Object



460
461
462
463
464
465
466
# File 'lib/redis/client.rb', line 460

def maybe_lock(&block)
  if @thread_safe
    @mutex.synchronize(&block)
  else
    block.call
  end
end

#mset(*args) ⇒ Object



197
198
199
200
201
202
203
204
# File 'lib/redis/client.rb', line 197

def mset(*args)
  if args.size == 1
    deprecated("mset with a hash", :mapped_mset, caller[0])
    mapped_mset(args[0])
  else
    call_command(args.unshift(:mset))
  end
end

#msetnx(*args) ⇒ Object



210
211
212
213
214
215
216
217
# File 'lib/redis/client.rb', line 210

def msetnx(*args)
  if args.size == 1
    deprecated("msetnx with a hash", :mapped_msetnx, caller[0])
    mapped_msetnx(args[0])
  else
    call_command(args.unshift(:msetnx))
  end
end

#multi(&block) ⇒ Object



286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/redis/client.rb', line 286

def multi(&block)
  result = call_command [:multi]

  return result unless block_given?

  begin
    yield(self)
    exec
  rescue Exception => e
    discard
    raise e
  end
end

#pipelined {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


274
275
276
277
278
279
# File 'lib/redis/client.rb', line 274

def pipelined(&block)
  Redis.deprecate("Calling pipelined commands on the yielded object will be deprecated in 2.0", caller[0])
  pipeline = Pipeline.new self
  yield pipeline
  pipeline.execute
end

#process_command(command, argvv) ⇒ Object



452
453
454
455
456
457
458
# File 'lib/redis/client.rb', line 452

def process_command(command, argvv)
  @sock.write(command)
  argvv.map do |argv|
    processor = REPLY_PROCESSOR[argv[0].to_s]
    processor ? processor.call(read_reply) : read_reply
  end
end

#quitObject



269
270
271
272
# File 'lib/redis/client.rb', line 269

def quit
  call_command(['quit'])
rescue Errno::ECONNRESET
end

#raw_call_command(argvp) ⇒ Object



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/redis/client.rb', line 396

def raw_call_command(argvp)
  if argvp[0].is_a?(Array)
    argvv = argvp
    pipeline = true
  else
    argvv = [argvp]
    pipeline = false
  end

  if @binary_keys or pipeline or MULTI_BULK_COMMANDS[argvv[0][0].to_s]
    command = ""
    argvv.each do |argv|
      command << "*#{argv.size}\r\n"
      argv.each{|a|
        a = a.to_s
        command << "$#{get_size(a)}\r\n"
        command << a
        command << "\r\n"
      }
    end
  else
    command = ""
    argvv.each do |argv|
      bulk = nil
      argv[0] = argv[0].to_s
      if ALIASES[argv[0]]
        deprecated(argv[0], ALIASES[argv[0]], caller[4])
        argv[0] = ALIASES[argv[0]]
      end
      raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
      if BULK_COMMANDS[argv[0]] and argv.length > 1
        bulk = argv[-1].to_s
        argv[-1] = get_size(bulk)
      end
      command << "#{argv.join(' ')}\r\n"
      command << "#{bulk}\r\n" if bulk
    end
  end
  # When in Pub/Sub mode we don't read replies synchronously.
  if @pubsub
    @sock.write(command)
    return true
  end
  # The normal command execution is reading and processing the reply.
  results = maybe_lock do
    begin
      set_socket_timeout!(0) if requires_timeout_reset?(argvv[0][0].to_s)
      process_command(command, argvv)
    ensure
      set_socket_timeout!(@timeout) if requires_timeout_reset?(argvv[0][0].to_s)
    end
  end

  return pipeline ? results : results[0]
end

#read_replyObject

Raises:

  • (Errno::ECONNRESET)


468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/redis/client.rb', line 468

def read_reply

  # We read the first byte using read() mainly because gets() is
  # immune to raw socket timeouts.
  begin
    reply_type = @sock.read(1)
  rescue Errno::EAGAIN

    # We want to make sure it reconnects on the next command after the
    # timeout. Otherwise the server may reply in the meantime leaving
    # the protocol in a desync status.
    disconnect

    raise Errno::EAGAIN, "Timeout reading from the socket"
  end

  raise Errno::ECONNRESET, "Connection lost" unless reply_type

  format_reply(reply_type, @sock.gets)
end

#select(*args) ⇒ Object



165
166
167
# File 'lib/redis/client.rb', line 165

def select(*args)
  raise "SELECT not allowed, use the :db option when creating the object"
end

#serverObject



354
355
356
# File 'lib/redis/client.rb', line 354

def server
  "#{@host}:#{@port}"
end

#set(key, value, ttl = nil) ⇒ Object



181
182
183
184
185
186
187
188
# File 'lib/redis/client.rb', line 181

def set(key, value, ttl = nil)
  if ttl
    deprecated("set with an expire", :set_with_expire, caller[0])
    set_with_expire(key, value, ttl)
  else
    call_command([:set, key, value])
  end
end

#set_with_expire(key, value, ttl) ⇒ Object



190
191
192
193
194
195
# File 'lib/redis/client.rb', line 190

def set_with_expire(key, value, ttl)
  multi do
    set(key, value)
    expire(key, ttl)
  end
end

#sort(key, options = {}) ⇒ Object



234
235
236
237
238
239
240
241
242
243
# File 'lib/redis/client.rb', line 234

def sort(key, options = {})
  cmd = []
  cmd << "SORT #{key}"
  cmd << "BY #{options[:by]}" if options[:by]
  cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
  cmd << "#{options[:order]}" if options[:order]
  cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
  cmd << "STORE #{options[:store]}" if options[:store]
  call_command(cmd)
end

#subscribe(*classes) {|sub| ... } ⇒ Object

Yields:

  • (sub)


300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/redis/client.rb', line 300

def subscribe(*classes)
  # Top-level `subscribe` MUST be called with a block,
  # nested `subscribe` MUST NOT be called with a block
  if !@pubsub && !block_given?
    raise "Top-level subscribe requires a block"
  elsif @pubsub == true && block_given?
    raise "Nested subscribe does not take a block"
  elsif @pubsub
    # If we're already pubsub'ing, just subscribe us to some more classes
    call_command [:subscribe,*classes]
    return true
  end

  @pubsub = true
  call_command [:subscribe,*classes]
  sub = Subscription.new
  yield(sub)
  begin
    while true
      type, *reply = read_reply # type, [class,data]
      case type
      when 'subscribe','unsubscribe'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      when 'message'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      end
      break if type == 'unsubscribe' && reply[1] == 0
    end
  rescue RuntimeError
    call_command [:unsubscribe]
    raise
  ensure
    @pubsub = false
  end
end

#to_sObject



161
162
163
# File 'lib/redis/client.rb', line 161

def to_s
  "Redis Client connected to #{server} against DB #{@db}"
end

#type(key) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



265
266
267
# File 'lib/redis/client.rb', line 265

def type(key)
  call_command(['type', key])
end