Class: Redis::Distributed

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/distributed.rb

Defined Under Namespace

Classes: CannotDistribute

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node_configs, options = {}) ⇒ Distributed

Returns a new instance of Distributed.



20
21
22
23
24
25
26
27
28
# File 'lib/redis/distributed.rb', line 20

def initialize(node_configs, options = {})
  @tag = options[:tag] || /^\{(.+?)\}/
  @ring = options[:ring] || HashRing.new
  @node_configs = node_configs.dup
  @default_options = options.dup
  node_configs.each { |node_config| add_node(node_config) }
  @subscribed_node = nil
  @watch_key = nil
end

Instance Attribute Details

#ringObject (readonly)

Returns the value of attribute ring.



18
19
20
# File 'lib/redis/distributed.rb', line 18

def ring
  @ring
end

Instance Method Details

#[](key) ⇒ Object



383
384
385
# File 'lib/redis/distributed.rb', line 383

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



387
388
389
# File 'lib/redis/distributed.rb', line 387

def []=(key, value)
  set(key, value)
end

#_bpop(cmd, args) ⇒ Object



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/redis/distributed.rb', line 434

def _bpop(cmd, args)
  timeout = if args.last.is_a?(Hash)
    options = args.pop
    options[:timeout]
  elsif args.last.respond_to?(:to_int)
    # Issue deprecation notice in obnoxious mode...
    args.pop.to_int
  end

  if args.size > 1
    # Issue deprecation notice in obnoxious mode...
  end

  keys = args.flatten

  ensure_same_node(cmd, keys) do |node|
    if timeout
      node.__send__(cmd, keys, timeout: timeout)
    else
      node.__send__(cmd, keys)
    end
  end
end

#_eval(cmd, args) ⇒ Object



884
885
886
887
888
889
890
891
892
893
894
895
# File 'lib/redis/distributed.rb', line 884

def _eval(cmd, args)
  script = args.shift
  options = args.pop if args.last.is_a?(Hash)
  options ||= {}

  keys = args.shift || options[:keys] || []
  argv = args.shift || options[:argv] || []

  ensure_same_node(cmd, keys) do |node|
    node.send(cmd, script, keys, argv)
  end
end

#add_node(options) ⇒ Object



41
42
43
44
45
# File 'lib/redis/distributed.rb', line 41

def add_node(options)
  options = { url: options } if options.is_a?(String)
  options = @default_options.merge(options)
  @ring.add_node Redis.new(options)
end

#append(key, value) ⇒ Object

Append a value to a key.



352
353
354
# File 'lib/redis/distributed.rb', line 352

def append(key, value)
  node_for(key).append(key, value)
end

#bgsaveObject

Asynchronously save the dataset to disk.



68
69
70
# File 'lib/redis/distributed.rb', line 68

def bgsave
  on_each_node :bgsave
end

#bitcount(key, start = 0, stop = -1)) ⇒ Object

Count the number of set bits in a range of the string value stored at key.



357
358
359
# File 'lib/redis/distributed.rb', line 357

def bitcount(key, start = 0, stop = -1)
  node_for(key).bitcount(key, start, stop)
end

#bitop(operation, destkey, *keys) ⇒ Object

Perform a bitwise operation between strings and store the resulting string in a key.



362
363
364
365
366
# File 'lib/redis/distributed.rb', line 362

def bitop(operation, destkey, *keys)
  ensure_same_node(:bitop, [destkey] + keys) do |node|
    node.bitop(operation, destkey, *keys)
  end
end

#bitpos(key, bit, start = nil, stop = nil) ⇒ Object

Return the position of the first bit set to 1 or 0 in a string.



369
370
371
# File 'lib/redis/distributed.rb', line 369

def bitpos(key, bit, start = nil, stop = nil)
  node_for(key).bitpos(key, bit, start, stop)
end

#blpop(*args) ⇒ Object

Remove and get the first element in a list, or block until one is available.



460
461
462
# File 'lib/redis/distributed.rb', line 460

def blpop(*args)
  _bpop(:blpop, args)
end

#brpop(*args) ⇒ Object

Remove and get the last element in a list, or block until one is available.



466
467
468
# File 'lib/redis/distributed.rb', line 466

def brpop(*args)
  _bpop(:brpop, args)
end

#brpoplpush(source, destination, deprecated_timeout = 0, **options) ⇒ Object

Pop a value from a list, push it to another list and return it; or block until one is available.



472
473
474
475
476
# File 'lib/redis/distributed.rb', line 472

def brpoplpush(source, destination, deprecated_timeout = 0, **options)
  ensure_same_node(:brpoplpush, [source, destination]) do |node|
    node.brpoplpush(source, destination, deprecated_timeout, **options)
  end
end

#dbsizeObject

Return the number of keys in the selected database.



73
74
75
# File 'lib/redis/distributed.rb', line 73

def dbsize
  on_each_node :dbsize
end

#decr(key) ⇒ Object

Decrement the integer value of a key by one.



252
253
254
# File 'lib/redis/distributed.rb', line 252

def decr(key)
  node_for(key).decr(key)
end

#decrby(key, decrement) ⇒ Object

Decrement the integer value of a key by the given number.



257
258
259
# File 'lib/redis/distributed.rb', line 257

def decrby(key, decrement)
  node_for(key).decrby(key, decrement)
end

#del(*args) ⇒ Object

Delete a key.



163
164
165
166
167
168
# File 'lib/redis/distributed.rb', line 163

def del(*args)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.del(*keys)
  end
end

#discardObject

Discard all commands issued after MULTI.

Raises:



851
852
853
854
855
856
857
# File 'lib/redis/distributed.rb', line 851

def discard
  raise CannotDistribute, :discard unless @watch_key

  result = node_for(@watch_key).discard
  @watch_key = nil
  result
end

#dump(key) ⇒ Object

Return a serialized version of the value stored at a key.



148
149
150
# File 'lib/redis/distributed.rb', line 148

def dump(key)
  node_for(key).dump(key)
end

#dupObject



911
912
913
# File 'lib/redis/distributed.rb', line 911

def dup
  self.class.new(@node_configs, @default_options)
end

#echo(value) ⇒ Object

Echo the given string.



58
59
60
# File 'lib/redis/distributed.rb', line 58

def echo(value)
  on_each_node :echo, value
end

#eval(*args) ⇒ Object

Evaluate Lua script.



898
899
900
# File 'lib/redis/distributed.rb', line 898

def eval(*args)
  _eval(:eval, args)
end

#evalsha(*args) ⇒ Object

Evaluate Lua script by its SHA.



903
904
905
# File 'lib/redis/distributed.rb', line 903

def evalsha(*args)
  _eval(:evalsha, args)
end

#execObject

Execute all commands issued after MULTI.

Raises:



842
843
844
845
846
847
848
# File 'lib/redis/distributed.rb', line 842

def exec
  raise CannotDistribute, :exec unless @watch_key

  result = node_for(@watch_key).exec
  @watch_key = nil
  result
end

#exists(*args) ⇒ Object

Determine if a key exists.



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/redis/distributed.rb', line 179

def exists(*args)
  if !Redis.exists_returns_integer && args.size == 1
    message = "`Redis#exists(key)` will return an Integer in redis-rb 4.3, if you want to keep the old behavior, " \
      "use `exists?` instead. To opt-in to the new behavior now you can set Redis.exists_returns_integer = true. " \
      "(#{::Kernel.caller(1, 1).first})\n"

    if defined?(::Warning)
      ::Warning.warn(message)
    else
      warn(message)
    end
    exists?(*args)
  else
    keys_per_node = args.group_by { |key| node_for(key) }
    keys_per_node.inject(0) do |sum, (node, keys)|
      sum + node._exists(*keys)
    end
  end
end

#exists?(*args) ⇒ Boolean

Determine if any of the keys exists.

Returns:

  • (Boolean)


200
201
202
203
204
205
206
# File 'lib/redis/distributed.rb', line 200

def exists?(*args)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.each do |node, keys|
    return true if node.exists?(*keys)
  end
  false
end

#expire(key, seconds) ⇒ Object

Set a key’s time to live in seconds.



118
119
120
# File 'lib/redis/distributed.rb', line 118

def expire(key, seconds)
  node_for(key).expire(key, seconds)
end

#expireat(key, unix_time) ⇒ Object

Set the expiration for a key as a UNIX timestamp.



123
124
125
# File 'lib/redis/distributed.rb', line 123

def expireat(key, unix_time)
  node_for(key).expireat(key, unix_time)
end

#flushallObject

Remove all keys from all databases.



78
79
80
# File 'lib/redis/distributed.rb', line 78

def flushall
  on_each_node :flushall
end

#flushdbObject

Remove all keys from the current database.



83
84
85
# File 'lib/redis/distributed.rb', line 83

def flushdb
  on_each_node :flushdb
end

#get(key) ⇒ Object

Get the value of a key.



315
316
317
# File 'lib/redis/distributed.rb', line 315

def get(key)
  node_for(key).get(key)
end

#getbit(key, offset) ⇒ Object

Returns the bit value at offset in the string value stored at key.



347
348
349
# File 'lib/redis/distributed.rb', line 347

def getbit(key, offset)
  node_for(key).getbit(key, offset)
end

#getrange(key, start, stop) ⇒ Object

Get a substring of the string stored at a key.



337
338
339
# File 'lib/redis/distributed.rb', line 337

def getrange(key, start, stop)
  node_for(key).getrange(key, start, stop)
end

#getset(key, value) ⇒ Object

Set the string value of a key and return its old value.



374
375
376
# File 'lib/redis/distributed.rb', line 374

def getset(key, value)
  node_for(key).getset(key, value)
end

#hdel(key, *fields) ⇒ Object

Delete one or more hash fields.



731
732
733
# File 'lib/redis/distributed.rb', line 731

def hdel(key, *fields)
  node_for(key).hdel(key, *fields)
end

#hexists(key, field) ⇒ Object

Determine if a hash field exists.



736
737
738
# File 'lib/redis/distributed.rb', line 736

def hexists(key, field)
  node_for(key).hexists(key, field)
end

#hget(key, field) ⇒ Object

Get the value of a hash field.



717
718
719
# File 'lib/redis/distributed.rb', line 717

def hget(key, field)
  node_for(key).hget(key, field)
end

#hgetall(key) ⇒ Object

Get all the fields and values in a hash.



761
762
763
# File 'lib/redis/distributed.rb', line 761

def hgetall(key)
  node_for(key).hgetall(key)
end

#hincrby(key, field, increment) ⇒ Object

Increment the integer value of a hash field by the given integer number.



741
742
743
# File 'lib/redis/distributed.rb', line 741

def hincrby(key, field, increment)
  node_for(key).hincrby(key, field, increment)
end

#hincrbyfloat(key, field, increment) ⇒ Object

Increment the numeric value of a hash field by the given float number.



746
747
748
# File 'lib/redis/distributed.rb', line 746

def hincrbyfloat(key, field, increment)
  node_for(key).hincrbyfloat(key, field, increment)
end

#hkeys(key) ⇒ Object

Get all the fields in a hash.



751
752
753
# File 'lib/redis/distributed.rb', line 751

def hkeys(key)
  node_for(key).hkeys(key)
end

#hlen(key) ⇒ Object

Get the number of fields in a hash.



693
694
695
# File 'lib/redis/distributed.rb', line 693

def hlen(key)
  node_for(key).hlen(key)
end

#hmget(key, *fields) ⇒ Object

Get the values of all the given hash fields.



722
723
724
# File 'lib/redis/distributed.rb', line 722

def hmget(key, *fields)
  node_for(key).hmget(key, *fields)
end

#hmset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



708
709
710
# File 'lib/redis/distributed.rb', line 708

def hmset(key, *attrs)
  node_for(key).hmset(key, *attrs)
end

#hset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



698
699
700
# File 'lib/redis/distributed.rb', line 698

def hset(key, *attrs)
  node_for(key).hset(key, *attrs)
end

#hsetnx(key, field, value) ⇒ Object

Set the value of a hash field, only if the field does not exist.



703
704
705
# File 'lib/redis/distributed.rb', line 703

def hsetnx(key, field, value)
  node_for(key).hsetnx(key, field, value)
end

#hvals(key) ⇒ Object

Get all the values in a hash.



756
757
758
# File 'lib/redis/distributed.rb', line 756

def hvals(key)
  node_for(key).hvals(key)
end

#incr(key) ⇒ Object

Increment the integer value of a key by one.



262
263
264
# File 'lib/redis/distributed.rb', line 262

def incr(key)
  node_for(key).incr(key)
end

#incrby(key, increment) ⇒ Object

Increment the integer value of a key by the given integer number.



267
268
269
# File 'lib/redis/distributed.rb', line 267

def incrby(key, increment)
  node_for(key).incrby(key, increment)
end

#incrbyfloat(key, increment) ⇒ Object

Increment the numeric value of a key by the given float number.



272
273
274
# File 'lib/redis/distributed.rb', line 272

def incrbyfloat(key, increment)
  node_for(key).incrbyfloat(key, increment)
end

#info(cmd = nil) ⇒ Object

Get information and statistics about the server.



88
89
90
# File 'lib/redis/distributed.rb', line 88

def info(cmd = nil)
  on_each_node :info, cmd
end

#inspectObject



907
908
909
# File 'lib/redis/distributed.rb', line 907

def inspect
  "#<Redis client v#{Redis::VERSION} for #{nodes.map(&:id).join(', ')}>"
end

#keys(glob = "*") ⇒ Object

Find all keys matching the given pattern.



209
210
211
# File 'lib/redis/distributed.rb', line 209

def keys(glob = "*")
  on_each_node(:keys, glob).flatten
end

#lastsaveObject

Get the UNIX time stamp of the last successful save to disk.



93
94
95
# File 'lib/redis/distributed.rb', line 93

def lastsave
  on_each_node :lastsave
end

#lindex(key, index) ⇒ Object

Get an element from a list by its index.



479
480
481
# File 'lib/redis/distributed.rb', line 479

def lindex(key, index)
  node_for(key).lindex(key, index)
end

#linsert(key, where, pivot, value) ⇒ Object

Insert an element before or after another element in a list.



484
485
486
# File 'lib/redis/distributed.rb', line 484

def linsert(key, where, pivot, value)
  node_for(key).linsert(key, where, pivot, value)
end

#llen(key) ⇒ Object

Get the length of a list.



392
393
394
# File 'lib/redis/distributed.rb', line 392

def llen(key)
  node_for(key).llen(key)
end

#lpop(key) ⇒ Object

Remove and get the first element in a list.



417
418
419
# File 'lib/redis/distributed.rb', line 417

def lpop(key)
  node_for(key).lpop(key)
end

#lpush(key, value) ⇒ Object

Prepend one or more values to a list.



397
398
399
# File 'lib/redis/distributed.rb', line 397

def lpush(key, value)
  node_for(key).lpush(key, value)
end

#lpushx(key, value) ⇒ Object

Prepend a value to a list, only if the list exists.



402
403
404
# File 'lib/redis/distributed.rb', line 402

def lpushx(key, value)
  node_for(key).lpushx(key, value)
end

#lrange(key, start, stop) ⇒ Object

Get a range of elements from a list.



489
490
491
# File 'lib/redis/distributed.rb', line 489

def lrange(key, start, stop)
  node_for(key).lrange(key, start, stop)
end

#lrem(key, count, value) ⇒ Object

Remove elements from a list.



494
495
496
# File 'lib/redis/distributed.rb', line 494

def lrem(key, count, value)
  node_for(key).lrem(key, count, value)
end

#lset(key, index, value) ⇒ Object

Set the value of an element in a list by its index.



499
500
501
# File 'lib/redis/distributed.rb', line 499

def lset(key, index, value)
  node_for(key).lset(key, index, value)
end

#ltrim(key, start, stop) ⇒ Object

Trim a list to the specified range.



504
505
506
# File 'lib/redis/distributed.rb', line 504

def ltrim(key, start, stop)
  node_for(key).ltrim(key, start, stop)
end

#mapped_hmget(key, *fields) ⇒ Object



726
727
728
# File 'lib/redis/distributed.rb', line 726

def mapped_hmget(key, *fields)
  Hash[*fields.zip(hmget(key, *fields)).flatten]
end

#mapped_hmset(key, hash) ⇒ Object



712
713
714
# File 'lib/redis/distributed.rb', line 712

def mapped_hmset(key, hash)
  node_for(key).hmset(key, *hash.to_a.flatten)
end

#mapped_mget(*keys) ⇒ Object

Get the values of all the given keys as a Hash.



325
326
327
328
329
# File 'lib/redis/distributed.rb', line 325

def mapped_mget(*keys)
  keys.group_by { |k| node_for k }.inject({}) do |results, (node, subkeys)|
    results.merge! node.mapped_mget(*subkeys)
  end
end

#mapped_mset(_hash) ⇒ Object

Raises:



301
302
303
# File 'lib/redis/distributed.rb', line 301

def mapped_mset(_hash)
  raise CannotDistribute, :mapped_mset
end

#mapped_msetnx(_hash) ⇒ Object

Raises:



310
311
312
# File 'lib/redis/distributed.rb', line 310

def mapped_msetnx(_hash)
  raise CannotDistribute, :mapped_msetnx
end

#mget(*keys) ⇒ Object

Get the values of all the given keys as an Array.



320
321
322
# File 'lib/redis/distributed.rb', line 320

def mget(*keys)
  mapped_mget(*keys).values_at(*keys)
end

#migrate(_key, _options) ⇒ Object

Transfer a key from the connected instance to another instance.

Raises:



158
159
160
# File 'lib/redis/distributed.rb', line 158

def migrate(_key, _options)
  raise CannotDistribute, :migrate
end

#monitorObject

Listen for all requests received by the server in real time.

Raises:

  • (NotImplementedError)


98
99
100
# File 'lib/redis/distributed.rb', line 98

def monitor
  raise NotImplementedError
end

#move(key, db) ⇒ Object

Move a key to another database.



214
215
216
# File 'lib/redis/distributed.rb', line 214

def move(key, db)
  node_for(key).move(key, db)
end

#mset(*_args) ⇒ Object

Set multiple keys to multiple values.

Raises:



297
298
299
# File 'lib/redis/distributed.rb', line 297

def mset(*_args)
  raise CannotDistribute, :mset
end

#msetnx(*_args) ⇒ Object

Set multiple keys to multiple values, only if none of the keys exist.

Raises:



306
307
308
# File 'lib/redis/distributed.rb', line 306

def msetnx(*_args)
  raise CannotDistribute, :msetnx
end

#multi(&block) ⇒ Object

Mark the start of a transaction block.

Raises:



833
834
835
836
837
838
839
# File 'lib/redis/distributed.rb', line 833

def multi(&block)
  raise CannotDistribute, :multi unless @watch_key

  result = node_for(@watch_key).multi(&block)
  @watch_key = nil if block_given?
  result
end

#node_for(key) ⇒ Object

Raises:



30
31
32
33
34
35
# File 'lib/redis/distributed.rb', line 30

def node_for(key)
  key = key_tag(key.to_s) || key.to_s
  raise CannotDistribute, :watch if @watch_key && @watch_key != key

  @ring.get_node(key)
end

#nodesObject



37
38
39
# File 'lib/redis/distributed.rb', line 37

def nodes
  @ring.nodes
end

#persist(key) ⇒ Object

Remove the expiration from a key.



113
114
115
# File 'lib/redis/distributed.rb', line 113

def persist(key)
  node_for(key).persist(key)
end

#pexpire(key, milliseconds) ⇒ Object

Set a key’s time to live in milliseconds.



133
134
135
# File 'lib/redis/distributed.rb', line 133

def pexpire(key, milliseconds)
  node_for(key).pexpire(key, milliseconds)
end

#pexpireat(key, ms_unix_time) ⇒ Object

Set the expiration for a key as number of milliseconds from UNIX Epoch.



138
139
140
# File 'lib/redis/distributed.rb', line 138

def pexpireat(key, ms_unix_time)
  node_for(key).pexpireat(key, ms_unix_time)
end

#pfadd(key, member) ⇒ Object

Add one or more members to a HyperLogLog structure.



865
866
867
# File 'lib/redis/distributed.rb', line 865

def pfadd(key, member)
  node_for(key).pfadd(key, member)
end

#pfcount(*keys) ⇒ Object

Get the approximate cardinality of members added to HyperLogLog structure.



870
871
872
873
874
# File 'lib/redis/distributed.rb', line 870

def pfcount(*keys)
  ensure_same_node(:pfcount, keys.flatten(1)) do |node|
    node.pfcount(keys)
  end
end

#pfmerge(dest_key, *source_key) ⇒ Object

Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of the observed Sets of the source HyperLogLog structures.



878
879
880
881
882
# File 'lib/redis/distributed.rb', line 878

def pfmerge(dest_key, *source_key)
  ensure_same_node(:pfmerge, [dest_key, *source_key]) do |node|
    node.pfmerge(dest_key, *source_key)
  end
end

#pingObject

Ping the server.



53
54
55
# File 'lib/redis/distributed.rb', line 53

def ping
  on_each_node :ping
end

#pipelinedObject

Raises:



828
829
830
# File 'lib/redis/distributed.rb', line 828

def pipelined
  raise CannotDistribute, :pipelined
end

#psetex(key, ttl, value) ⇒ Object

Set the time to live in milliseconds of a key.



287
288
289
# File 'lib/redis/distributed.rb', line 287

def psetex(key, ttl, value)
  node_for(key).psetex(key, ttl, value)
end

#psubscribe(*channels, &block) ⇒ Object

Listen for messages published to channels matching the given patterns.

Raises:

  • (NotImplementedError)


795
796
797
# File 'lib/redis/distributed.rb', line 795

def psubscribe(*channels, &block)
  raise NotImplementedError
end

#pttl(key) ⇒ Object

Get the time to live (in milliseconds) for a key.



143
144
145
# File 'lib/redis/distributed.rb', line 143

def pttl(key)
  node_for(key).pttl(key)
end

#publish(channel, message) ⇒ Object

Post a message to a channel.



766
767
768
# File 'lib/redis/distributed.rb', line 766

def publish(channel, message)
  node_for(channel).publish(channel, message)
end

#punsubscribe(*channels) ⇒ Object

Stop listening for messages posted to channels matching the given patterns.

Raises:

  • (NotImplementedError)


801
802
803
# File 'lib/redis/distributed.rb', line 801

def punsubscribe(*channels)
  raise NotImplementedError
end

#quitObject

Close the connection.



63
64
65
# File 'lib/redis/distributed.rb', line 63

def quit
  on_each_node :quit
end

#randomkeyObject

Return a random key from the keyspace.

Raises:



219
220
221
# File 'lib/redis/distributed.rb', line 219

def randomkey
  raise CannotDistribute, :randomkey
end

#rename(old_name, new_name) ⇒ Object

Rename a key.



224
225
226
227
228
# File 'lib/redis/distributed.rb', line 224

def rename(old_name, new_name)
  ensure_same_node(:rename, [old_name, new_name]) do |node|
    node.rename(old_name, new_name)
  end
end

#renamenx(old_name, new_name) ⇒ Object

Rename a key, only if the new key does not exist.



231
232
233
234
235
# File 'lib/redis/distributed.rb', line 231

def renamenx(old_name, new_name)
  ensure_same_node(:renamenx, [old_name, new_name]) do |node|
    node.renamenx(old_name, new_name)
  end
end

#restore(key, ttl, serialized_value, **options) ⇒ Object

Create a key using the serialized value, previously obtained using DUMP.



153
154
155
# File 'lib/redis/distributed.rb', line 153

def restore(key, ttl, serialized_value, **options)
  node_for(key).restore(key, ttl, serialized_value, **options)
end

#rpop(key) ⇒ Object

Remove and get the last element in a list.



422
423
424
# File 'lib/redis/distributed.rb', line 422

def rpop(key)
  node_for(key).rpop(key)
end

#rpoplpush(source, destination) ⇒ Object

Remove the last element in a list, append it to another list and return it.



428
429
430
431
432
# File 'lib/redis/distributed.rb', line 428

def rpoplpush(source, destination)
  ensure_same_node(:rpoplpush, [source, destination]) do |node|
    node.rpoplpush(source, destination)
  end
end

#rpush(key, value) ⇒ Object

Append one or more values to a list.



407
408
409
# File 'lib/redis/distributed.rb', line 407

def rpush(key, value)
  node_for(key).rpush(key, value)
end

#rpushx(key, value) ⇒ Object

Append a value to a list, only if the list exists.



412
413
414
# File 'lib/redis/distributed.rb', line 412

def rpushx(key, value)
  node_for(key).rpushx(key, value)
end

#sadd(key, member) ⇒ Object

Add one or more members to a set.



514
515
516
# File 'lib/redis/distributed.rb', line 514

def sadd(key, member)
  node_for(key).sadd(key, member)
end

#saveObject

Synchronously save the dataset to disk.



103
104
105
# File 'lib/redis/distributed.rb', line 103

def save
  on_each_node :save
end

#scard(key) ⇒ Object

Get the number of members in a set.



509
510
511
# File 'lib/redis/distributed.rb', line 509

def scard(key)
  node_for(key).scard(key)
end

#script(subcommand, *args) ⇒ Object

Control remote script registry.



860
861
862
# File 'lib/redis/distributed.rb', line 860

def script(subcommand, *args)
  on_each_node(:script, subcommand, *args)
end

#sdiff(*keys) ⇒ Object

Subtract multiple sets.



561
562
563
564
565
# File 'lib/redis/distributed.rb', line 561

def sdiff(*keys)
  ensure_same_node(:sdiff, keys) do |node|
    node.sdiff(*keys)
  end
end

#sdiffstore(destination, *keys) ⇒ Object

Subtract multiple sets and store the resulting set in a key.



568
569
570
571
572
# File 'lib/redis/distributed.rb', line 568

def sdiffstore(destination, *keys)
  ensure_same_node(:sdiffstore, [destination] + keys) do |node|
    node.sdiffstore(destination, *keys)
  end
end

#select(db) ⇒ Object

Change the selected database for the current connection.



48
49
50
# File 'lib/redis/distributed.rb', line 48

def select(db)
  on_each_node :select, db
end

#set(key, value, **options) ⇒ Object

Set the string value of a key.



277
278
279
# File 'lib/redis/distributed.rb', line 277

def set(key, value, **options)
  node_for(key).set(key, value, **options)
end

#setbit(key, offset, value) ⇒ Object

Sets or clears the bit at offset in the string value stored at key.



342
343
344
# File 'lib/redis/distributed.rb', line 342

def setbit(key, offset, value)
  node_for(key).setbit(key, offset, value)
end

#setex(key, ttl, value) ⇒ Object

Set the time to live in seconds of a key.



282
283
284
# File 'lib/redis/distributed.rb', line 282

def setex(key, ttl, value)
  node_for(key).setex(key, ttl, value)
end

#setnx(key, value) ⇒ Object

Set the value of a key, only if the key does not exist.



292
293
294
# File 'lib/redis/distributed.rb', line 292

def setnx(key, value)
  node_for(key).setnx(key, value)
end

#setrange(key, offset, value) ⇒ Object

Overwrite part of a string at key starting at the specified offset.



332
333
334
# File 'lib/redis/distributed.rb', line 332

def setrange(key, offset, value)
  node_for(key).setrange(key, offset, value)
end

#sinter(*keys) ⇒ Object

Intersect multiple sets.



575
576
577
578
579
# File 'lib/redis/distributed.rb', line 575

def sinter(*keys)
  ensure_same_node(:sinter, keys) do |node|
    node.sinter(*keys)
  end
end

#sinterstore(destination, *keys) ⇒ Object

Intersect multiple sets and store the resulting set in a key.



582
583
584
585
586
# File 'lib/redis/distributed.rb', line 582

def sinterstore(destination, *keys)
  ensure_same_node(:sinterstore, [destination] + keys) do |node|
    node.sinterstore(destination, *keys)
  end
end

#sismember(key, member) ⇒ Object

Determine if a given value is a member of a set.



541
542
543
# File 'lib/redis/distributed.rb', line 541

def sismember(key, member)
  node_for(key).sismember(key, member)
end

#smembers(key) ⇒ Object

Get all the members in a set.



546
547
548
# File 'lib/redis/distributed.rb', line 546

def smembers(key)
  node_for(key).smembers(key)
end

#smove(source, destination, member) ⇒ Object

Move a member from one set to another.



534
535
536
537
538
# File 'lib/redis/distributed.rb', line 534

def smove(source, destination, member)
  ensure_same_node(:smove, [source, destination]) do |node|
    node.smove(source, destination, member)
  end
end

#sort(key, **options) ⇒ Object

Sort the elements in a list, set or sorted set.



238
239
240
241
242
243
244
# File 'lib/redis/distributed.rb', line 238

def sort(key, **options)
  keys = [key, options[:by], options[:store], *Array(options[:get])].compact

  ensure_same_node(:sort, keys) do |node|
    node.sort(key, **options)
  end
end

#spop(key, count = nil) ⇒ Object

Remove and return a random member from a set.



524
525
526
# File 'lib/redis/distributed.rb', line 524

def spop(key, count = nil)
  node_for(key).spop(key, count)
end

#srandmember(key, count = nil) ⇒ Object

Get a random member from a set.



529
530
531
# File 'lib/redis/distributed.rb', line 529

def srandmember(key, count = nil)
  node_for(key).srandmember(key, count)
end

#srem(key, member) ⇒ Object

Remove one or more members from a set.



519
520
521
# File 'lib/redis/distributed.rb', line 519

def srem(key, member)
  node_for(key).srem(key, member)
end

#sscan(key, cursor, **options) ⇒ Object

Scan a set



551
552
553
# File 'lib/redis/distributed.rb', line 551

def sscan(key, cursor, **options)
  node_for(key).sscan(key, cursor, **options)
end

#sscan_each(key, **options, &block) ⇒ Object

Scan a set and return an enumerator



556
557
558
# File 'lib/redis/distributed.rb', line 556

def sscan_each(key, **options, &block)
  node_for(key).sscan_each(key, **options, &block)
end

#strlen(key) ⇒ Object

Get the length of the value stored in a key.



379
380
381
# File 'lib/redis/distributed.rb', line 379

def strlen(key)
  node_for(key).strlen(key)
end

#subscribe(channel, *channels, &block) ⇒ Object

Listen for messages published to the given channels.



775
776
777
778
779
780
781
782
783
784
785
# File 'lib/redis/distributed.rb', line 775

def subscribe(channel, *channels, &block)
  if channels.empty?
    @subscribed_node = node_for(channel)
    @subscribed_node.subscribe(channel, &block)
  else
    ensure_same_node(:subscribe, [channel] + channels) do |node|
      @subscribed_node = node
      node.subscribe(channel, *channels, &block)
    end
  end
end

#subscribed?Boolean

Returns:

  • (Boolean)


770
771
772
# File 'lib/redis/distributed.rb', line 770

def subscribed?
  !!@subscribed_node
end

#sunion(*keys) ⇒ Object

Add multiple sets.



589
590
591
592
593
# File 'lib/redis/distributed.rb', line 589

def sunion(*keys)
  ensure_same_node(:sunion, keys) do |node|
    node.sunion(*keys)
  end
end

#sunionstore(destination, *keys) ⇒ Object

Add multiple sets and store the resulting set in a key.



596
597
598
599
600
# File 'lib/redis/distributed.rb', line 596

def sunionstore(destination, *keys)
  ensure_same_node(:sunionstore, [destination] + keys) do |node|
    node.sunionstore(destination, *keys)
  end
end

#timeObject

Get server time: an UNIX timestamp and the elapsed microseconds in the current second.



108
109
110
# File 'lib/redis/distributed.rb', line 108

def time
  on_each_node :time
end

#ttl(key) ⇒ Object

Get the time to live (in seconds) for a key.



128
129
130
# File 'lib/redis/distributed.rb', line 128

def ttl(key)
  node_for(key).ttl(key)
end

#type(key) ⇒ Object

Determine the type stored at key.



247
248
249
# File 'lib/redis/distributed.rb', line 247

def type(key)
  node_for(key).type(key)
end

Unlink keys.



171
172
173
174
175
176
# File 'lib/redis/distributed.rb', line 171

def unlink(*args)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.unlink(*keys)
  end
end

#unsubscribe(*channels) ⇒ Object

Stop listening for messages posted to the given channels.



788
789
790
791
792
# File 'lib/redis/distributed.rb', line 788

def unsubscribe(*channels)
  raise "Can't unsubscribe if not subscribed." unless subscribed?

  @subscribed_node.unsubscribe(*channels)
end

#unwatchObject

Forget about all watched keys.

Raises:



820
821
822
823
824
825
826
# File 'lib/redis/distributed.rb', line 820

def unwatch
  raise CannotDistribute, :unwatch unless @watch_key

  result = node_for(@watch_key).unwatch
  @watch_key = nil
  result
end

#watch(*keys, &block) ⇒ Object

Watch the given keys to determine execution of the MULTI/EXEC block.



806
807
808
809
810
811
812
813
814
815
816
817
# File 'lib/redis/distributed.rb', line 806

def watch(*keys, &block)
  ensure_same_node(:watch, keys) do |node|
    @watch_key = key_tag(keys.first) || keys.first.to_s

    begin
      node.watch(*keys, &block)
    rescue StandardError
      @watch_key = nil
      raise
    end
  end
end

#zadd(key, *args) ⇒ Object

Add one or more members to a sorted set, or update the score for members that already exist.



609
610
611
# File 'lib/redis/distributed.rb', line 609

def zadd(key, *args)
  node_for(key).zadd(key, *args)
end

#zcard(key) ⇒ Object

Get the number of members in a sorted set.



603
604
605
# File 'lib/redis/distributed.rb', line 603

def zcard(key)
  node_for(key).zcard(key)
end

#zcount(key, min, max) ⇒ Object

Get the number of members in a particular score range.



673
674
675
# File 'lib/redis/distributed.rb', line 673

def zcount(key, min, max)
  node_for(key).zcount(key, min, max)
end

#zincrby(key, increment, member) ⇒ Object

Increment the score of a member in a sorted set.



615
616
617
# File 'lib/redis/distributed.rb', line 615

def zincrby(key, increment, member)
  node_for(key).zincrby(key, increment, member)
end

#zinterstore(destination, keys, **options) ⇒ Object

Intersect multiple sorted sets and store the resulting sorted set in a new key.



679
680
681
682
683
# File 'lib/redis/distributed.rb', line 679

def zinterstore(destination, keys, **options)
  ensure_same_node(:zinterstore, [destination] + keys) do |node|
    node.zinterstore(destination, keys, **options)
  end
end

#zrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index.



630
631
632
# File 'lib/redis/distributed.rb', line 630

def zrange(key, start, stop, **options)
  node_for(key).zrange(key, start, stop, **options)
end

#zrangebyscore(key, min, max, **options) ⇒ Object

Return a range of members in a sorted set, by score.



657
658
659
# File 'lib/redis/distributed.rb', line 657

def zrangebyscore(key, min, max, **options)
  node_for(key).zrangebyscore(key, min, max, **options)
end

#zrank(key, member) ⇒ Object

Determine the index of a member in a sorted set.



641
642
643
# File 'lib/redis/distributed.rb', line 641

def zrank(key, member)
  node_for(key).zrank(key, member)
end

#zrem(key, member) ⇒ Object

Remove one or more members from a sorted set.



620
621
622
# File 'lib/redis/distributed.rb', line 620

def zrem(key, member)
  node_for(key).zrem(key, member)
end

#zremrangebyrank(key, start, stop) ⇒ Object

Remove all members in a sorted set within the given indexes.



652
653
654
# File 'lib/redis/distributed.rb', line 652

def zremrangebyrank(key, start, stop)
  node_for(key).zremrangebyrank(key, start, stop)
end

#zremrangebyscore(key, min, max) ⇒ Object

Remove all members in a sorted set within the given scores.



668
669
670
# File 'lib/redis/distributed.rb', line 668

def zremrangebyscore(key, min, max)
  node_for(key).zremrangebyscore(key, min, max)
end

#zrevrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index, with scores ordered from high to low.



636
637
638
# File 'lib/redis/distributed.rb', line 636

def zrevrange(key, start, stop, **options)
  node_for(key).zrevrange(key, start, stop, **options)
end

#zrevrangebyscore(key, max, min, **options) ⇒ Object

Return a range of members in a sorted set, by score, with scores ordered from high to low.



663
664
665
# File 'lib/redis/distributed.rb', line 663

def zrevrangebyscore(key, max, min, **options)
  node_for(key).zrevrangebyscore(key, max, min, **options)
end

#zrevrank(key, member) ⇒ Object

Determine the index of a member in a sorted set, with scores ordered from high to low.



647
648
649
# File 'lib/redis/distributed.rb', line 647

def zrevrank(key, member)
  node_for(key).zrevrank(key, member)
end

#zscore(key, member) ⇒ Object

Get the score associated with the given member in a sorted set.



625
626
627
# File 'lib/redis/distributed.rb', line 625

def zscore(key, member)
  node_for(key).zscore(key, member)
end

#zunionstore(destination, keys, **options) ⇒ Object

Add multiple sorted sets and store the resulting sorted set in a new key.



686
687
688
689
690
# File 'lib/redis/distributed.rb', line 686

def zunionstore(destination, keys, **options)
  ensure_same_node(:zunionstore, [destination] + keys) do |node|
    node.zunionstore(destination, keys, **options)
  end
end