Class: Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/redis.rb,
lib/pipeline.rb

Direct Known Subclasses

Pipeline

Defined Under Namespace

Classes: Pipeline

Constant Summary collapse

OK =
"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
BULK_COMMANDS =
{
  "set"       => true,
  "setnx"     => true,
  "rpush"     => true,
  "lpush"     => true,
  "lset"      => true,
  "lrem"      => true,
  "sadd"      => true,
  "srem"      => true,
  "sismember" => true,
  "rpoplpush" => true,
  "echo"      => true,
  "getset"    => true,
  "smove"     => true,
  "zadd"      => true,
  "zincrby"   => true,
  "zrem"      => true,
  "zscore"    => true
}
MULTI_BULK_COMMANDS =
{
  "mset"      => true,
  "msetnx"    => true,
  "zhadd"     => true,
}
BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "keys"      => lambda{|r| r.split(" ")},
  "info"      => lambda{|r|
    info = {}
    r.each_line {|kv|
      k,v = kv.split(":",2).map{|x| x.chomp}
      info[k.to_sym] = v
    }
    info
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => "zcard",
  "zset_range_by_score"  => "zrangebyscore",
  "zset_reverse_range"   => "zrevrange",
  "zset_range"           => "zrange",
  "zset_delete"          => "zrem",
  "zset_score"           => "zscore",
  "zset_incr_by"         => "zincrby",
  "zset_increment_by"    => "zincrby"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Redis

Returns a new instance of Redis.



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/redis.rb', line 125

def initialize(options = {})
  @host    =  options[:host]    || '127.0.0.1'
  @port    = (options[:port]    || 6379).to_i
  @db      = (options[:db]      || 0).to_i
  @timeout = (options[:timeout] || 5).to_i
  @password = options[:password]
  @logger  =  options[:logger]
  @thread_safe = options[:thread_safe]
  @mutex = Mutex.new if @thread_safe
  @sock = nil

  @logger.info { self.to_s } if @logger
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv) ⇒ Object



187
188
189
# File 'lib/redis.rb', line 187

def method_missing(*argv)
  call_command(argv)
end

Instance Method Details

#[](key) ⇒ Object



268
269
270
# File 'lib/redis.rb', line 268

def [](key)
  self.get(key)
end

#[]=(key, value) ⇒ Object



272
273
274
# File 'lib/redis.rb', line 272

def []=(key,value)
  set(key,value)
end

#call_command(argv) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/redis.rb', line 191

def call_command(argv)
  @logger.debug { argv.inspect } if @logger

  # this wrapper to raw_call_command handle reconnection on socket
  # error. We try to reconnect just one time, otherwise let the error
  # araise.
  connect_to_server if !@sock

  begin
    raw_call_command(argv.dup)
  rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
    @sock.close rescue nil
    @sock = nil
    connect_to_server
    raw_call_command(argv.dup)
  end
end

#connect_to(host, port, timeout = nil) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/redis.rb', line 153

def connect_to(host, port, timeout=nil)
  # We support connect() timeout only if system_timer is availabe
  # or if we are running against Ruby >= 1.9
  # Timeout reading from the socket instead will be supported anyway.
  if @timeout != 0 and RedisTimer
    begin
      sock = TCPSocket.new(host, port)
    rescue Timeout::Error
      @sock = nil
      raise Timeout::Error, "Timeout connecting to the server"
    end
  else
    sock = TCPSocket.new(host, port)
  end
  sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

  # If the timeout is set we set the low level socket options in order
  # to make sure a blocking read will return after the specified number
  # of seconds. This hack is from memcached ruby client.
  if timeout
    secs   = Integer(timeout)
    usecs  = Integer((timeout - secs) * 1_000_000)
    optval = [secs, usecs].pack("l_2")
    begin
      sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
      sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
    rescue Exception => ex
      # Solaris, for one, does not like/support socket timeouts.
      @logger.info "Unable to use raw socket timeouts: #{ex.class.name}: #{ex.message}" if @logger
    end
  end
  sock
end

#connect_to_serverObject



147
148
149
150
151
# File 'lib/redis.rb', line 147

def connect_to_server
  @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout)
  call_command(["auth",@password]) if @password
  call_command(["select",@db]) unless @db == 0
end

#decr(key, decrement = nil) ⇒ Object



296
297
298
# File 'lib/redis.rb', line 296

def decr(key,decrement = nil)
  call_command(decrement ? ["decrby",key,decrement] : ["decr",key])
end

#incr(key, increment = nil) ⇒ Object



292
293
294
# File 'lib/redis.rb', line 292

def incr(key, increment = nil)
  call_command(increment ? ["incrby",key,increment] : ["incr",key])
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



302
303
304
305
306
307
308
309
# File 'lib/redis.rb', line 302

def mapped_mget(*keys)
  result = {}
  mget(*keys).each do |value|
    key = keys.shift
    result.merge!(key => value) unless value.nil?
  end
  result
end

#maybe_lock(&block) ⇒ Object



256
257
258
259
260
261
262
# File 'lib/redis.rb', line 256

def maybe_lock(&block)
  if @thread_safe
    @mutex.synchronize &block
  else
    block.call
  end
end

#pipelined {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


322
323
324
325
326
# File 'lib/redis.rb', line 322

def pipelined(&block)
  pipeline = Pipeline.new self
  yield pipeline
  pipeline.execute
end

#process_command(command, argvv) ⇒ Object



248
249
250
251
252
253
254
# File 'lib/redis.rb', line 248

def process_command(command, argvv)
  @sock.write(command)
  argvv.map do |argv|
    processor = REPLY_PROCESSOR[argv[0]]
    processor ? processor.call(read_reply) : read_reply
  end
end

#quitObject



317
318
319
320
# File 'lib/redis.rb', line 317

def quit
  call_command(['quit'])
rescue Errno::ECONNRESET
end

#raw_call_command(argvp) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/redis.rb', line 209

def raw_call_command(argvp)
  pipeline = argvp[0].is_a?(Array)

  unless pipeline
    argvv = [argvp]
  else
    argvv = argvp
  end

  if MULTI_BULK_COMMANDS[argvv.flatten[0].to_s]
    # TODO improve this code
    argvp   = argvv.flatten
    command = ["*#{argvp.size}"]
    argvp.each do |v|
      v = v.to_s
      command << "$#{get_size(v)}"
      command << v
    end
    command = command.map {|cmd| "#{cmd}\r\n"}.join
  else
    command = ""
    argvv.each do |argv|
      bulk = nil
      argv[0] = argv[0].to_s.downcase
      argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]]
      raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
      if BULK_COMMANDS[argv[0]] and argv.length > 1
        bulk = argv[-1].to_s
        argv[-1] = get_size(bulk)
      end
      command << "#{argv.join(' ')}\r\n"
      command << "#{bulk}\r\n" if bulk
    end
  end
  results = maybe_lock { process_command(command, argvv) }
  
  return pipeline ? results : results[0]
end

#read_replyObject

Raises:

  • (Errno::ECONNRESET)


328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/redis.rb', line 328

def read_reply
  # We read the first byte using read() mainly because gets() is
  # immune to raw socket timeouts.
  begin
    rtype = @sock.read(1)
  rescue Errno::EAGAIN
    # We want to make sure it reconnects on the next command after the
    # timeout. Otherwise the server may reply in the meantime leaving
    # the protocol in a desync status.
    @sock = nil
    raise Errno::EAGAIN, "Timeout reading from the socket"
  end

  raise Errno::ECONNRESET,"Connection lost" if !rtype
  line = @sock.gets
  case rtype
  when MINUS
    raise MINUS + line.strip
  when PLUS
    line.strip
  when COLON
    line.to_i
  when DOLLAR
    bulklen = line.to_i
    return nil if bulklen == -1
    data = @sock.read(bulklen)
    @sock.read(2) # CRLF
    data
  when ASTERISK
    objects = line.to_i
    return nil if bulklen == -1
    res = []
    objects.times {
      res << read_reply
    }
    res
  else
    raise "Protocol error, got '#{rtype}' as initial reply byte"
  end
end

#select(*args) ⇒ Object



264
265
266
# File 'lib/redis.rb', line 264

def select(*args)
  raise "SELECT not allowed, use the :db option when creating the object"
end

#serverObject



143
144
145
# File 'lib/redis.rb', line 143

def server
  "#{@host}:#{@port}"
end

#set(key, value, expiry = nil) ⇒ Object



276
277
278
279
280
# File 'lib/redis.rb', line 276

def set(key, value, expiry=nil)
  s = call_command([:set, key, value]) == OK
  expire(key, expiry) if s && expiry
  s
end

#sort(key, options = {}) ⇒ Object



282
283
284
285
286
287
288
289
290
# File 'lib/redis.rb', line 282

def sort(key, options = {})
  cmd = ["SORT"]
  cmd << key
  cmd << "BY #{options[:by]}" if options[:by]
  cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
  cmd << "#{options[:order]}" if options[:order]
  cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
  call_command(cmd)
end

#to_sObject



139
140
141
# File 'lib/redis.rb', line 139

def to_s
  "Redis Client connected to #{server} against DB #{@db}"
end

#type(key) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



313
314
315
# File 'lib/redis.rb', line 313

def type(key)
  call_command(['type', key])
end