Class: Redis::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/client.rb

Direct Known Subclasses

Pipeline

Defined Under Namespace

Classes: ProtocolError

Constant Summary collapse

OK =
"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
BULK_COMMANDS =
{
  "set"       => true,
  "setnx"     => true,
  "rpush"     => true,
  "lpush"     => true,
  "lset"      => true,
  "lrem"      => true,
  "sadd"      => true,
  "srem"      => true,
  "sismember" => true,
  "echo"      => true,
  "getset"    => true,
  "smove"     => true,
  "zadd"      => true,
  "zincrby"   => true,
  "zrem"      => true,
  "zscore"    => true,
  "zrank"     => true,
  "zrevrank"  => true,
  "hget"      => true,
  "hdel"      => true,
  "hexists"   => true,
  "publish"   => true
}
MULTI_BULK_COMMANDS =
{
  "mset"      => true,
  "msetnx"    => true,
  "hset"      => true,
  "hmset"     => true
}
BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "hset"      => BOOLEAN_PROCESSOR,
  "hexists"   => BOOLEAN_PROCESSOR,
  "info"      => lambda{|r|
    info = {}
    r.each_line {|kv|
      k,v = kv.split(":",2).map{|x| x.chomp}
      info[k.to_sym] = v
    }
    info
  },
  "keys"      => lambda{|r|
    if r.is_a?(Array)
        r
    else
        r.split(" ")
    end
  },
  "hgetall"   => lambda{|r|
    Hash[*r]
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => "zcard",
  "zset_range_by_score"  => "zrangebyscore",
  "zset_reverse_range"   => "zrevrange",
  "zset_range"           => "zrange",
  "zset_delete"          => "zrem",
  "zset_score"           => "zscore",
  "zset_incr_by"         => "zincrby",
  "zset_increment_by"    => "zincrby"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}
BLOCKING_COMMANDS =
{
  "blpop" => true,
  "brpop" => true
}

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/redis/client.rb', line 139

def initialize(options = {})
  @host    =  options[:host]    || '127.0.0.1'
  @port    = (options[:port]    || 6379).to_i
  @db      = (options[:db]      || 0).to_i
  @timeout = (options[:timeout] || 5).to_i
  @password = options[:password]
  @logger  =  options[:logger]
  @thread_safe = options[:thread_safe]
  @binary_keys = options[:binary_keys]
  @evented = defined?(EM) && EM.reactor_running?
  @mutex = ::Mutex.new if @thread_safe && !@evented
  @sock = nil
  @pubsub = false
  @sock = nil
  self.extend(Redis::EventedClient) if defined?(EM) and EM.reactor_running?
  log(self)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv) ⇒ Object



387
388
389
# File 'lib/redis/client.rb', line 387

def method_missing(*argv)
  call_command(argv)
end

Instance Method Details

#[](key) ⇒ Object



165
166
167
# File 'lib/redis/client.rb', line 165

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



169
170
171
# File 'lib/redis/client.rb', line 169

def []=(key,value)
  set(key, value)
end

#call_command(argv) ⇒ Object

Wrap raw_call_command to handle reconnection on socket error. We try to reconnect just one time, otherwise let the error araise.



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/redis/client.rb', line 333

def call_command(argv)
  log(argv.inspect, :debug)

  connect_to_server unless connected?

  begin
    raw_call_command(argv.dup)
  rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
    if reconnect
      raw_call_command(argv.dup)
    else
      raise Errno::ECONNRESET
    end
  end
end

#connect_to(host, port) ⇒ Object



353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/redis/client.rb', line 353

def connect_to(host, port)

  # We support connect_to() timeout only if system_timer is availabe
  # or if we are running against Ruby >= 1.9
  # Timeout reading from the socket instead will be supported anyway.
  if @timeout != 0 and Timer
    begin
      @sock = TCPSocket.new(host, port)
    rescue Timeout::Error
      @sock = nil
      raise Timeout::Error, "Timeout connecting to the server"
    end
  else
    @sock = TCPSocket.new(host, port)
  end

  @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

  # If the timeout is set we set the low level socket options in order
  # to make sure a blocking read will return after the specified number
  # of seconds. This hack is from memcached ruby client.
  set_socket_timeout!(@timeout) if @timeout

rescue Errno::ECONNREFUSED
  raise Errno::ECONNREFUSED, "Unable to connect to Redis on #{host}:#{port}"
end

#connect_to_serverObject



380
381
382
383
384
385
# File 'lib/redis/client.rb', line 380

def connect_to_server
  connect_to(@host, @port)
  call_command([:auth, @password]) if @password
  call_command([:select, @db]) if @db != 0
  @sock
end

#decr(key, decrement = nil) ⇒ Object



250
251
252
253
254
255
256
257
# File 'lib/redis/client.rb', line 250

def decr(key, decrement = nil)
  if decrement
    deprecated("decr with a decrement", :decrby, caller[0])
    decrby(key, decrement)
  else
    call_command([:decr, key])
  end
end

#execObject



276
277
278
279
# File 'lib/redis/client.rb', line 276

def exec
  # Need to override Kernel#exec.
  call_command([:exec])
end

#get(key) ⇒ Object



173
174
175
# File 'lib/redis/client.rb', line 173

def get(key)
  call_command([:get, key])
end

#get_size(string) ⇒ Object



486
487
488
# File 'lib/redis/client.rb', line 486

def get_size(string)
  string.bytesize
end

#incr(key, increment = nil) ⇒ Object



241
242
243
244
245
246
247
248
# File 'lib/redis/client.rb', line 241

def incr(key, increment = nil)
  if increment
    deprecated("incr with an increment", :incrby, caller[0])
    incrby(key, increment)
  else
    call_command([:incr, key])
  end
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



221
222
223
224
225
226
227
228
# File 'lib/redis/client.rb', line 221

def mapped_mget(*keys)
  result = {}
  mget(*keys).each do |value|
    key = keys.shift
    result.merge!(key => value) unless value.nil?
  end
  result
end

#mapped_mset(hash) ⇒ Object



202
203
204
# File 'lib/redis/client.rb', line 202

def mapped_mset(hash)
  mset(*hash.to_a.flatten)
end

#mapped_msetnx(hash) ⇒ Object



215
216
217
# File 'lib/redis/client.rb', line 215

def mapped_msetnx(hash)
  msetnx(*hash.to_a.flatten)
end

#maybe_lock(&block) ⇒ Object



455
456
457
458
459
460
461
# File 'lib/redis/client.rb', line 455

def maybe_lock(&block)
  if @thread_safe && !@evented
    @mutex.synchronize(&block)
  else
    block.call
  end
end

#mset(*args) ⇒ Object



193
194
195
196
197
198
199
200
# File 'lib/redis/client.rb', line 193

def mset(*args)
  if args.size == 1
    deprecated("mset with a hash", :mapped_mset, caller[0])
    mapped_mset(args[0])
  else
    call_command(args.unshift(:mset))
  end
end

#msetnx(*args) ⇒ Object



206
207
208
209
210
211
212
213
# File 'lib/redis/client.rb', line 206

def msetnx(*args)
  if args.size == 1
    deprecated("msetnx with a hash", :mapped_msetnx, caller[0])
    mapped_msetnx(args[0])
  else
    call_command(args.unshift(:msetnx))
  end
end

#multi(&block) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/redis/client.rb', line 281

def multi(&block)
  result = call_command [:multi]

  return result unless block_given?

  begin
    yield(self)
    exec
  rescue Exception => e
    discard
    raise e
  end
end

#pipelined {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


270
271
272
273
274
# File 'lib/redis/client.rb', line 270

def pipelined(&block)
  pipeline = Pipeline.new self
  yield pipeline
  pipeline.execute
end

#process_command(command, argvv) ⇒ Object



447
448
449
450
451
452
453
# File 'lib/redis/client.rb', line 447

def process_command(command, argvv)
  @sock.write(command)
  argvv.map do |argv|
    processor = REPLY_PROCESSOR[argv[0].to_s]
    processor ? processor.call(read_reply) : read_reply
  end
end

#quitObject



265
266
267
268
# File 'lib/redis/client.rb', line 265

def quit
  call_command(['quit'])
rescue Errno::ECONNRESET
end

#raw_call_command(argvp) ⇒ Object



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# File 'lib/redis/client.rb', line 391

def raw_call_command(argvp)
  if argvp[0].is_a?(Array)
    argvv = argvp
    pipeline = true
  else
    argvv = [argvp]
    pipeline = false
  end

  if @binary_keys or pipeline or MULTI_BULK_COMMANDS[argvv[0][0].to_s]
    command = ""
    argvv.each do |argv|
      command << "*#{argv.size}\r\n"
      argv.each{|a|
        a = a.to_s
        command << "$#{get_size(a)}\r\n"
        command << a
        command << "\r\n"
      }
    end
  else
    command = ""
    argvv.each do |argv|
      bulk = nil
      argv[0] = argv[0].to_s
      if ALIASES[argv[0]]
        deprecated(argv[0], ALIASES[argv[0]], caller[4])
        argv[0] = ALIASES[argv[0]]
      end
      raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
      if BULK_COMMANDS[argv[0]] and argv.length > 1
        bulk = argv[-1].to_s
        argv[-1] = get_size(bulk)
      end
      command << "#{argv.join(' ')}\r\n"
      command << "#{bulk}\r\n" if bulk
    end
  end
  # When in Pub/Sub mode we don't read replies synchronously.
  if @pubsub
    @sock.write(command)
    return true
  end
  # The normal command execution is reading and processing the reply.
  results = maybe_lock do
    begin
      set_socket_timeout!(0) if requires_timeout_reset?(argvv[0][0].to_s)
      process_command(command, argvv)
    ensure
      set_socket_timeout!(@timeout) if requires_timeout_reset?(argvv[0][0].to_s)
    end
  end

  return pipeline ? results : results[0]
end

#read_replyObject

Raises:

  • (Errno::ECONNRESET)


463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
# File 'lib/redis/client.rb', line 463

def read_reply

  # We read the first byte using read() mainly because gets() is
  # immune to raw socket timeouts.
  begin
    reply_type = @sock.read(1)
  rescue Errno::EAGAIN

    # We want to make sure it reconnects on the next command after the
    # timeout. Otherwise the server may reply in the meantime leaving
    # the protocol in a desync status.
    disconnect

    raise Errno::EAGAIN, "Timeout reading from the socket"
  end

  raise Errno::ECONNRESET, "Connection lost" unless reply_type

  format_reply(reply_type, @sock.gets)
end

#select(*args) ⇒ Object



161
162
163
# File 'lib/redis/client.rb', line 161

def select(*args)
  raise "SELECT not allowed, use the :db option when creating the object"
end

#serverObject



349
350
351
# File 'lib/redis/client.rb', line 349

def server
  "#{@host}:#{@port}"
end

#set(key, value, ttl = nil) ⇒ Object



177
178
179
180
181
182
183
184
# File 'lib/redis/client.rb', line 177

def set(key, value, ttl = nil)
  if ttl
    deprecated("set with an expire", :set_with_expire, caller[0])
    set_with_expire(key, value, ttl)
  else
    call_command([:set, key, value])
  end
end

#set_with_expire(key, value, ttl) ⇒ Object



186
187
188
189
190
191
# File 'lib/redis/client.rb', line 186

def set_with_expire(key, value, ttl)
  multi do
    set(key, value)
    expire(key, ttl)
  end
end

#sort(key, options = {}) ⇒ Object



230
231
232
233
234
235
236
237
238
239
# File 'lib/redis/client.rb', line 230

def sort(key, options = {})
  cmd = []
  cmd << "SORT #{key}"
  cmd << "BY #{options[:by]}" if options[:by]
  cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
  cmd << "#{options[:order]}" if options[:order]
  cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
  cmd << "STORE #{options[:store]}" if options[:store]
  call_command(cmd)
end

#subscribe(*classes) {|sub| ... } ⇒ Object

Yields:

  • (sub)


295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/redis/client.rb', line 295

def subscribe(*classes)
  # Top-level `subscribe` MUST be called with a block,
  # nested `subscribe` MUST NOT be called with a block
  if !@pubsub && !block_given?
    raise "Top-level subscribe requires a block"
  elsif @pubsub == true && block_given?
    raise "Nested subscribe does not take a block"
  elsif @pubsub
    # If we're already pubsub'ing, just subscribe us to some more classes
    call_command [:subscribe,*classes]
    return true
  end

  @pubsub = true
  call_command [:subscribe,*classes]
  sub = Subscription.new
  yield(sub)
  begin
    while true
      type, *reply = read_reply # type, [class,data]
      case type
      when 'subscribe','unsubscribe'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      when 'message'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      end
      break if type == 'unsubscribe' && reply[1] == 0
    end
  rescue RuntimeError
    call_command [:unsubscribe]
    raise
  ensure
    @pubsub = false
  end
end

#to_sObject



157
158
159
# File 'lib/redis/client.rb', line 157

def to_s
  "Redis Client connected to #{server} against DB #{@db}"
end

#type(key) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



261
262
263
# File 'lib/redis/client.rb', line 261

def type(key)
  call_command(['type', key])
end