Class: Redis::Distributed

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/distributed.rb

Defined Under Namespace

Classes: CannotDistribute

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node_configs, options = {}) ⇒ Distributed



20
21
22
23
24
25
26
27
28
# File 'lib/redis/distributed.rb', line 20

def initialize(node_configs, options = {})
  @tag = options[:tag] || /^\{(.+?)\}/
  @ring = options[:ring] || HashRing.new
  @node_configs = node_configs.dup
  @default_options = options.dup
  node_configs.each { |node_config| add_node(node_config) }
  @subscribed_node = nil
  @watch_key = nil
end

Instance Attribute Details

#ringObject (readonly)

Returns the value of attribute ring.



18
19
20
# File 'lib/redis/distributed.rb', line 18

def ring
  @ring
end

Instance Method Details

#[](key) ⇒ Object



396
397
398
# File 'lib/redis/distributed.rb', line 396

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



400
401
402
# File 'lib/redis/distributed.rb', line 400

def []=(key, value)
  set(key, value)
end

#_bpop(cmd, args) ⇒ Object



462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/redis/distributed.rb', line 462

def _bpop(cmd, args)
  timeout = if args.last.is_a?(Hash)
    options = args.pop
    options[:timeout]
  elsif args.last.respond_to?(:to_int)
    last_arg = args.pop
    ::Redis.deprecate!(
      "Passing the timeout as a positional argument is deprecated, it should be passed as a keyword argument:\n" \
      "  redis.#{cmd}(#{args.map(&:inspect).join(', ')}, timeout: #{last_arg.to_int})" \
      "(called from: #{caller(2, 1).first})"
    )
    last_arg.to_int
  end

  keys = args.flatten

  ensure_same_node(cmd, keys) do |node|
    if timeout
      node.__send__(cmd, keys, timeout: timeout)
    else
      node.__send__(cmd, keys)
    end
  end
end

#_eval(cmd, args) ⇒ Object



979
980
981
982
983
984
985
986
987
988
989
990
# File 'lib/redis/distributed.rb', line 979

def _eval(cmd, args)
  script = args.shift
  options = args.pop if args.last.is_a?(Hash)
  options ||= {}

  keys = args.shift || options[:keys] || []
  argv = args.shift || options[:argv] || []

  ensure_same_node(cmd, keys) do |node|
    node.send(cmd, script, keys, argv)
  end
end

#add_node(options) ⇒ Object



41
42
43
44
45
# File 'lib/redis/distributed.rb', line 41

def add_node(options)
  options = { url: options } if options.is_a?(String)
  options = @default_options.merge(options)
  @ring.add_node Redis.new(options)
end

#append(key, value) ⇒ Object

Append a value to a key.



365
366
367
# File 'lib/redis/distributed.rb', line 365

def append(key, value)
  node_for(key).append(key, value)
end

#bgsaveObject

Asynchronously save the dataset to disk.



68
69
70
# File 'lib/redis/distributed.rb', line 68

def bgsave
  on_each_node :bgsave
end

#bitcount(key, start = 0, stop = -1)) ⇒ Object

Count the number of set bits in a range of the string value stored at key.



370
371
372
# File 'lib/redis/distributed.rb', line 370

def bitcount(key, start = 0, stop = -1)
  node_for(key).bitcount(key, start, stop)
end

#bitop(operation, destkey, *keys) ⇒ Object

Perform a bitwise operation between strings and store the resulting string in a key.



375
376
377
378
379
# File 'lib/redis/distributed.rb', line 375

def bitop(operation, destkey, *keys)
  ensure_same_node(:bitop, [destkey] + keys) do |node|
    node.bitop(operation, destkey, *keys)
  end
end

#bitpos(key, bit, start = nil, stop = nil) ⇒ Object

Return the position of the first bit set to 1 or 0 in a string.



382
383
384
# File 'lib/redis/distributed.rb', line 382

def bitpos(key, bit, start = nil, stop = nil)
  node_for(key).bitpos(key, bit, start, stop)
end

#blmove(source, destination, where_source, where_destination, timeout: 0) ⇒ Object

Remove the first/last element in a list and append/prepend it to another list and return it, or block until one is available.



418
419
420
421
422
# File 'lib/redis/distributed.rb', line 418

def blmove(source, destination, where_source, where_destination, timeout: 0)
  ensure_same_node(:lmove, [source, destination]) do |node|
    node.blmove(source, destination, where_source, where_destination, timeout: timeout)
  end
end

#blpop(*args) ⇒ Object

Remove and get the first element in a list, or block until one is available.



489
490
491
# File 'lib/redis/distributed.rb', line 489

def blpop(*args)
  _bpop(:blpop, args)
end

#brpop(*args) ⇒ Object

Remove and get the last element in a list, or block until one is available.



495
496
497
# File 'lib/redis/distributed.rb', line 495

def brpop(*args)
  _bpop(:brpop, args)
end

#brpoplpush(source, destination, deprecated_timeout = 0, **options) ⇒ Object

Pop a value from a list, push it to another list and return it; or block until one is available.



501
502
503
504
505
# File 'lib/redis/distributed.rb', line 501

def brpoplpush(source, destination, deprecated_timeout = 0, **options)
  ensure_same_node(:brpoplpush, [source, destination]) do |node|
    node.brpoplpush(source, destination, deprecated_timeout, **options)
  end
end

#copy(source, destination, **options) ⇒ Object

Copy a value from one key to another.



215
216
217
218
219
# File 'lib/redis/distributed.rb', line 215

def copy(source, destination, **options)
  ensure_same_node(:copy, [source, destination]) do |node|
    node.copy(source, destination, **options)
  end
end

#dbsizeObject

Return the number of keys in the selected database.



73
74
75
# File 'lib/redis/distributed.rb', line 73

def dbsize
  on_each_node :dbsize
end

#decr(key) ⇒ Object

Decrement the integer value of a key by one.



255
256
257
# File 'lib/redis/distributed.rb', line 255

def decr(key)
  node_for(key).decr(key)
end

#decrby(key, decrement) ⇒ Object

Decrement the integer value of a key by the given number.



260
261
262
# File 'lib/redis/distributed.rb', line 260

def decrby(key, decrement)
  node_for(key).decrby(key, decrement)
end

#del(*args) ⇒ Object

Delete a key.



163
164
165
166
167
168
# File 'lib/redis/distributed.rb', line 163

def del(*args)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.del(*keys)
  end
end

#discardObject

Discard all commands issued after MULTI.

Raises:



946
947
948
949
950
951
952
# File 'lib/redis/distributed.rb', line 946

def discard
  raise CannotDistribute, :discard unless @watch_key

  result = node_for(@watch_key).discard
  @watch_key = nil
  result
end

#dump(key) ⇒ Object

Return a serialized version of the value stored at a key.



148
149
150
# File 'lib/redis/distributed.rb', line 148

def dump(key)
  node_for(key).dump(key)
end

#dupObject



1006
1007
1008
# File 'lib/redis/distributed.rb', line 1006

def dup
  self.class.new(@node_configs, @default_options)
end

#echo(value) ⇒ Object

Echo the given string.



58
59
60
# File 'lib/redis/distributed.rb', line 58

def echo(value)
  on_each_node :echo, value
end

#eval(*args) ⇒ Object

Evaluate Lua script.



993
994
995
# File 'lib/redis/distributed.rb', line 993

def eval(*args)
  _eval(:eval, args)
end

#evalsha(*args) ⇒ Object

Evaluate Lua script by its SHA.



998
999
1000
# File 'lib/redis/distributed.rb', line 998

def evalsha(*args)
  _eval(:evalsha, args)
end

#execObject

Execute all commands issued after MULTI.

Raises:



937
938
939
940
941
942
943
# File 'lib/redis/distributed.rb', line 937

def exec
  raise CannotDistribute, :exec unless @watch_key

  result = node_for(@watch_key).exec
  @watch_key = nil
  result
end

#exists(*args) ⇒ Object

Determine if a key exists.



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/redis/distributed.rb', line 179

def exists(*args)
  if !Redis.exists_returns_integer && args.size == 1
    ::Redis.deprecate!(
      "`Redis#exists(key)` will return an Integer in redis-rb 4.3, if you want to keep the old behavior, " \
      "use `exists?` instead. To opt-in to the new behavior now you can set Redis.exists_returns_integer = true. " \
      "(#{::Kernel.caller(1, 1).first})\n"
    )
    exists?(*args)
  else
    keys_per_node = args.group_by { |key| node_for(key) }
    keys_per_node.inject(0) do |sum, (node, keys)|
      sum + node._exists(*keys)
    end
  end
end

#exists?(*args) ⇒ Boolean

Determine if any of the keys exists.



196
197
198
199
200
201
202
# File 'lib/redis/distributed.rb', line 196

def exists?(*args)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.each do |node, keys|
    return true if node.exists?(*keys)
  end
  false
end

#expire(key, seconds, **kwargs) ⇒ Object

Set a key’s time to live in seconds.



118
119
120
# File 'lib/redis/distributed.rb', line 118

def expire(key, seconds, **kwargs)
  node_for(key).expire(key, seconds, **kwargs)
end

#expireat(key, unix_time, **kwargs) ⇒ Object

Set the expiration for a key as a UNIX timestamp.



123
124
125
# File 'lib/redis/distributed.rb', line 123

def expireat(key, unix_time, **kwargs)
  node_for(key).expireat(key, unix_time, **kwargs)
end

#flushallObject

Remove all keys from all databases.



78
79
80
# File 'lib/redis/distributed.rb', line 78

def flushall
  on_each_node :flushall
end

#flushdbObject

Remove all keys from the current database.



83
84
85
# File 'lib/redis/distributed.rb', line 83

def flushdb
  on_each_node :flushdb
end

#get(key) ⇒ Object

Get the value of a key.



318
319
320
# File 'lib/redis/distributed.rb', line 318

def get(key)
  node_for(key).get(key)
end

#getbit(key, offset) ⇒ Object

Returns the bit value at offset in the string value stored at key.



360
361
362
# File 'lib/redis/distributed.rb', line 360

def getbit(key, offset)
  node_for(key).getbit(key, offset)
end

#getdel(key) ⇒ Object

Get the value of a key and delete it.



323
324
325
# File 'lib/redis/distributed.rb', line 323

def getdel(key)
  node_for(key).getdel(key)
end

#getex(key, **options) ⇒ Object

Get the value of a key and sets its time to live based on options.



328
329
330
# File 'lib/redis/distributed.rb', line 328

def getex(key, **options)
  node_for(key).getex(key, **options)
end

#getrange(key, start, stop) ⇒ Object

Get a substring of the string stored at a key.



350
351
352
# File 'lib/redis/distributed.rb', line 350

def getrange(key, start, stop)
  node_for(key).getrange(key, start, stop)
end

#getset(key, value) ⇒ Object

Set the string value of a key and return its old value.



387
388
389
# File 'lib/redis/distributed.rb', line 387

def getset(key, value)
  node_for(key).getset(key, value)
end

#hdel(key, *fields) ⇒ Object

Delete one or more hash fields.



826
827
828
# File 'lib/redis/distributed.rb', line 826

def hdel(key, *fields)
  node_for(key).hdel(key, *fields)
end

#hexists(key, field) ⇒ Object

Determine if a hash field exists.



831
832
833
# File 'lib/redis/distributed.rb', line 831

def hexists(key, field)
  node_for(key).hexists(key, field)
end

#hget(key, field) ⇒ Object

Get the value of a hash field.



808
809
810
# File 'lib/redis/distributed.rb', line 808

def hget(key, field)
  node_for(key).hget(key, field)
end

#hgetall(key) ⇒ Object

Get all the fields and values in a hash.



856
857
858
# File 'lib/redis/distributed.rb', line 856

def hgetall(key)
  node_for(key).hgetall(key)
end

#hincrby(key, field, increment) ⇒ Object

Increment the integer value of a hash field by the given integer number.



836
837
838
# File 'lib/redis/distributed.rb', line 836

def hincrby(key, field, increment)
  node_for(key).hincrby(key, field, increment)
end

#hincrbyfloat(key, field, increment) ⇒ Object

Increment the numeric value of a hash field by the given float number.



841
842
843
# File 'lib/redis/distributed.rb', line 841

def hincrbyfloat(key, field, increment)
  node_for(key).hincrbyfloat(key, field, increment)
end

#hkeys(key) ⇒ Object

Get all the fields in a hash.



846
847
848
# File 'lib/redis/distributed.rb', line 846

def hkeys(key)
  node_for(key).hkeys(key)
end

#hlen(key) ⇒ Object

Get the number of fields in a hash.



784
785
786
# File 'lib/redis/distributed.rb', line 784

def hlen(key)
  node_for(key).hlen(key)
end

#hmget(key, *fields) ⇒ Object

Get the values of all the given hash fields.



813
814
815
# File 'lib/redis/distributed.rb', line 813

def hmget(key, *fields)
  node_for(key).hmget(key, *fields)
end

#hmset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



799
800
801
# File 'lib/redis/distributed.rb', line 799

def hmset(key, *attrs)
  node_for(key).hmset(key, *attrs)
end

#hrandfield(key, count = nil, **options) ⇒ Object



821
822
823
# File 'lib/redis/distributed.rb', line 821

def hrandfield(key, count = nil, **options)
  node_for(key).hrandfield(key, count, **options)
end

#hset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



789
790
791
# File 'lib/redis/distributed.rb', line 789

def hset(key, *attrs)
  node_for(key).hset(key, *attrs)
end

#hsetnx(key, field, value) ⇒ Object

Set the value of a hash field, only if the field does not exist.



794
795
796
# File 'lib/redis/distributed.rb', line 794

def hsetnx(key, field, value)
  node_for(key).hsetnx(key, field, value)
end

#hvals(key) ⇒ Object

Get all the values in a hash.



851
852
853
# File 'lib/redis/distributed.rb', line 851

def hvals(key)
  node_for(key).hvals(key)
end

#incr(key) ⇒ Object

Increment the integer value of a key by one.



265
266
267
# File 'lib/redis/distributed.rb', line 265

def incr(key)
  node_for(key).incr(key)
end

#incrby(key, increment) ⇒ Object

Increment the integer value of a key by the given integer number.



270
271
272
# File 'lib/redis/distributed.rb', line 270

def incrby(key, increment)
  node_for(key).incrby(key, increment)
end

#incrbyfloat(key, increment) ⇒ Object

Increment the numeric value of a key by the given float number.



275
276
277
# File 'lib/redis/distributed.rb', line 275

def incrbyfloat(key, increment)
  node_for(key).incrbyfloat(key, increment)
end

#info(cmd = nil) ⇒ Object

Get information and statistics about the server.



88
89
90
# File 'lib/redis/distributed.rb', line 88

def info(cmd = nil)
  on_each_node :info, cmd
end

#inspectObject



1002
1003
1004
# File 'lib/redis/distributed.rb', line 1002

def inspect
  "#<Redis client v#{Redis::VERSION} for #{nodes.map(&:id).join(', ')}>"
end

#keys(glob = "*") ⇒ Object

Find all keys matching the given pattern.



205
206
207
# File 'lib/redis/distributed.rb', line 205

def keys(glob = "*")
  on_each_node(:keys, glob).flatten
end

#lastsaveObject

Get the UNIX time stamp of the last successful save to disk.



93
94
95
# File 'lib/redis/distributed.rb', line 93

def lastsave
  on_each_node :lastsave
end

#lindex(key, index) ⇒ Object

Get an element from a list by its index.



508
509
510
# File 'lib/redis/distributed.rb', line 508

def lindex(key, index)
  node_for(key).lindex(key, index)
end

#linsert(key, where, pivot, value) ⇒ Object

Insert an element before or after another element in a list.



513
514
515
# File 'lib/redis/distributed.rb', line 513

def linsert(key, where, pivot, value)
  node_for(key).linsert(key, where, pivot, value)
end

#llen(key) ⇒ Object

Get the length of a list.



405
406
407
# File 'lib/redis/distributed.rb', line 405

def llen(key)
  node_for(key).llen(key)
end

#lmove(source, destination, where_source, where_destination) ⇒ Object

Remove the first/last element in a list, append/prepend it to another list and return it.



410
411
412
413
414
# File 'lib/redis/distributed.rb', line 410

def lmove(source, destination, where_source, where_destination)
  ensure_same_node(:lmove, [source, destination]) do |node|
    node.lmove(source, destination, where_source, where_destination)
  end
end

#lpop(key, count = nil) ⇒ Object

Remove and get the first elements in a list.



445
446
447
# File 'lib/redis/distributed.rb', line 445

def lpop(key, count = nil)
  node_for(key).lpop(key, count)
end

#lpush(key, value) ⇒ Object

Prepend one or more values to a list.



425
426
427
# File 'lib/redis/distributed.rb', line 425

def lpush(key, value)
  node_for(key).lpush(key, value)
end

#lpushx(key, value) ⇒ Object

Prepend a value to a list, only if the list exists.



430
431
432
# File 'lib/redis/distributed.rb', line 430

def lpushx(key, value)
  node_for(key).lpushx(key, value)
end

#lrange(key, start, stop) ⇒ Object

Get a range of elements from a list.



518
519
520
# File 'lib/redis/distributed.rb', line 518

def lrange(key, start, stop)
  node_for(key).lrange(key, start, stop)
end

#lrem(key, count, value) ⇒ Object

Remove elements from a list.



523
524
525
# File 'lib/redis/distributed.rb', line 523

def lrem(key, count, value)
  node_for(key).lrem(key, count, value)
end

#lset(key, index, value) ⇒ Object

Set the value of an element in a list by its index.



528
529
530
# File 'lib/redis/distributed.rb', line 528

def lset(key, index, value)
  node_for(key).lset(key, index, value)
end

#ltrim(key, start, stop) ⇒ Object

Trim a list to the specified range.



533
534
535
# File 'lib/redis/distributed.rb', line 533

def ltrim(key, start, stop)
  node_for(key).ltrim(key, start, stop)
end

#mapped_hmget(key, *fields) ⇒ Object



817
818
819
# File 'lib/redis/distributed.rb', line 817

def mapped_hmget(key, *fields)
  Hash[*fields.zip(hmget(key, *fields)).flatten]
end

#mapped_hmset(key, hash) ⇒ Object



803
804
805
# File 'lib/redis/distributed.rb', line 803

def mapped_hmset(key, hash)
  node_for(key).hmset(key, *hash.to_a.flatten)
end

#mapped_mget(*keys) ⇒ Object

Get the values of all the given keys as a Hash.



338
339
340
341
342
# File 'lib/redis/distributed.rb', line 338

def mapped_mget(*keys)
  keys.group_by { |k| node_for k }.inject({}) do |results, (node, subkeys)|
    results.merge! node.mapped_mget(*subkeys)
  end
end

#mapped_mset(_hash) ⇒ Object

Raises:



304
305
306
# File 'lib/redis/distributed.rb', line 304

def mapped_mset(_hash)
  raise CannotDistribute, :mapped_mset
end

#mapped_msetnx(_hash) ⇒ Object

Raises:



313
314
315
# File 'lib/redis/distributed.rb', line 313

def mapped_msetnx(_hash)
  raise CannotDistribute, :mapped_msetnx
end

#mget(*keys) ⇒ Object

Get the values of all the given keys as an Array.



333
334
335
# File 'lib/redis/distributed.rb', line 333

def mget(*keys)
  mapped_mget(*keys).values_at(*keys)
end

#migrate(_key, _options) ⇒ Object

Transfer a key from the connected instance to another instance.

Raises:



158
159
160
# File 'lib/redis/distributed.rb', line 158

def migrate(_key, _options)
  raise CannotDistribute, :migrate
end

#monitorObject

Listen for all requests received by the server in real time.

Raises:

  • (NotImplementedError)


98
99
100
# File 'lib/redis/distributed.rb', line 98

def monitor
  raise NotImplementedError
end

#move(key, db) ⇒ Object

Move a key to another database.



210
211
212
# File 'lib/redis/distributed.rb', line 210

def move(key, db)
  node_for(key).move(key, db)
end

#mset(*_args) ⇒ Object

Set multiple keys to multiple values.

Raises:



300
301
302
# File 'lib/redis/distributed.rb', line 300

def mset(*_args)
  raise CannotDistribute, :mset
end

#msetnx(*_args) ⇒ Object

Set multiple keys to multiple values, only if none of the keys exist.

Raises:



309
310
311
# File 'lib/redis/distributed.rb', line 309

def msetnx(*_args)
  raise CannotDistribute, :msetnx
end

#multi(&block) ⇒ Object

Mark the start of a transaction block.

Raises:



928
929
930
931
932
933
934
# File 'lib/redis/distributed.rb', line 928

def multi(&block)
  raise CannotDistribute, :multi unless @watch_key

  result = node_for(@watch_key).multi(&block)
  @watch_key = nil if block_given?
  result
end

#node_for(key) ⇒ Object

Raises:



30
31
32
33
34
35
# File 'lib/redis/distributed.rb', line 30

def node_for(key)
  key = key_tag(key.to_s) || key.to_s
  raise CannotDistribute, :watch if @watch_key && @watch_key != key

  @ring.get_node(key)
end

#nodesObject



37
38
39
# File 'lib/redis/distributed.rb', line 37

def nodes
  @ring.nodes
end

#persist(key) ⇒ Object

Remove the expiration from a key.



113
114
115
# File 'lib/redis/distributed.rb', line 113

def persist(key)
  node_for(key).persist(key)
end

#pexpire(key, milliseconds, **kwarg) ⇒ Object

Set a key’s time to live in milliseconds.



133
134
135
# File 'lib/redis/distributed.rb', line 133

def pexpire(key, milliseconds, **kwarg)
  node_for(key).pexpire(key, milliseconds, **kwarg)
end

#pexpireat(key, ms_unix_time, **kwarg) ⇒ Object

Set the expiration for a key as number of milliseconds from UNIX Epoch.



138
139
140
# File 'lib/redis/distributed.rb', line 138

def pexpireat(key, ms_unix_time, **kwarg)
  node_for(key).pexpireat(key, ms_unix_time, **kwarg)
end

#pfadd(key, member) ⇒ Object

Add one or more members to a HyperLogLog structure.



960
961
962
# File 'lib/redis/distributed.rb', line 960

def pfadd(key, member)
  node_for(key).pfadd(key, member)
end

#pfcount(*keys) ⇒ Object

Get the approximate cardinality of members added to HyperLogLog structure.



965
966
967
968
969
# File 'lib/redis/distributed.rb', line 965

def pfcount(*keys)
  ensure_same_node(:pfcount, keys.flatten(1)) do |node|
    node.pfcount(keys)
  end
end

#pfmerge(dest_key, *source_key) ⇒ Object

Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of the observed Sets of the source HyperLogLog structures.



973
974
975
976
977
# File 'lib/redis/distributed.rb', line 973

def pfmerge(dest_key, *source_key)
  ensure_same_node(:pfmerge, [dest_key, *source_key]) do |node|
    node.pfmerge(dest_key, *source_key)
  end
end

#pingObject

Ping the server.



53
54
55
# File 'lib/redis/distributed.rb', line 53

def ping
  on_each_node :ping
end

#pipelinedObject

Raises:



923
924
925
# File 'lib/redis/distributed.rb', line 923

def pipelined
  raise CannotDistribute, :pipelined
end

#psetex(key, ttl, value) ⇒ Object

Set the time to live in milliseconds of a key.



290
291
292
# File 'lib/redis/distributed.rb', line 290

def psetex(key, ttl, value)
  node_for(key).psetex(key, ttl, value)
end

#psubscribe(*channels, &block) ⇒ Object

Listen for messages published to channels matching the given patterns.

Raises:

  • (NotImplementedError)


890
891
892
# File 'lib/redis/distributed.rb', line 890

def psubscribe(*channels, &block)
  raise NotImplementedError
end

#pttl(key) ⇒ Object

Get the time to live (in milliseconds) for a key.



143
144
145
# File 'lib/redis/distributed.rb', line 143

def pttl(key)
  node_for(key).pttl(key)
end

#publish(channel, message) ⇒ Object

Post a message to a channel.



861
862
863
# File 'lib/redis/distributed.rb', line 861

def publish(channel, message)
  node_for(channel).publish(channel, message)
end

#punsubscribe(*channels) ⇒ Object

Stop listening for messages posted to channels matching the given patterns.

Raises:

  • (NotImplementedError)


896
897
898
# File 'lib/redis/distributed.rb', line 896

def punsubscribe(*channels)
  raise NotImplementedError
end

#quitObject

Close the connection.



63
64
65
# File 'lib/redis/distributed.rb', line 63

def quit
  on_each_node :quit
end

#randomkeyObject

Return a random key from the keyspace.

Raises:



222
223
224
# File 'lib/redis/distributed.rb', line 222

def randomkey
  raise CannotDistribute, :randomkey
end

#rename(old_name, new_name) ⇒ Object

Rename a key.



227
228
229
230
231
# File 'lib/redis/distributed.rb', line 227

def rename(old_name, new_name)
  ensure_same_node(:rename, [old_name, new_name]) do |node|
    node.rename(old_name, new_name)
  end
end

#renamenx(old_name, new_name) ⇒ Object

Rename a key, only if the new key does not exist.



234
235
236
237
238
# File 'lib/redis/distributed.rb', line 234

def renamenx(old_name, new_name)
  ensure_same_node(:renamenx, [old_name, new_name]) do |node|
    node.renamenx(old_name, new_name)
  end
end

#restore(key, ttl, serialized_value, **options) ⇒ Object

Create a key using the serialized value, previously obtained using DUMP.



153
154
155
# File 'lib/redis/distributed.rb', line 153

def restore(key, ttl, serialized_value, **options)
  node_for(key).restore(key, ttl, serialized_value, **options)
end

#rpop(key, count = nil) ⇒ Object

Remove and get the last elements in a list.



450
451
452
# File 'lib/redis/distributed.rb', line 450

def rpop(key, count = nil)
  node_for(key).rpop(key, count)
end

#rpoplpush(source, destination) ⇒ Object

Remove the last element in a list, append it to another list and return it.



456
457
458
459
460
# File 'lib/redis/distributed.rb', line 456

def rpoplpush(source, destination)
  ensure_same_node(:rpoplpush, [source, destination]) do |node|
    node.rpoplpush(source, destination)
  end
end

#rpush(key, value) ⇒ Object

Append one or more values to a list.



435
436
437
# File 'lib/redis/distributed.rb', line 435

def rpush(key, value)
  node_for(key).rpush(key, value)
end

#rpushx(key, value) ⇒ Object

Append a value to a list, only if the list exists.



440
441
442
# File 'lib/redis/distributed.rb', line 440

def rpushx(key, value)
  node_for(key).rpushx(key, value)
end

#sadd(key, member) ⇒ Object

Add one or more members to a set.



543
544
545
# File 'lib/redis/distributed.rb', line 543

def sadd(key, member)
  node_for(key).sadd(key, member)
end

#sadd?(key, member) ⇒ Boolean

Add one or more members to a set.



548
549
550
# File 'lib/redis/distributed.rb', line 548

def sadd?(key, member)
  node_for(key).sadd?(key, member)
end

#saveObject

Synchronously save the dataset to disk.



103
104
105
# File 'lib/redis/distributed.rb', line 103

def save
  on_each_node :save
end

#scard(key) ⇒ Object

Get the number of members in a set.



538
539
540
# File 'lib/redis/distributed.rb', line 538

def scard(key)
  node_for(key).scard(key)
end

#script(subcommand, *args) ⇒ Object

Control remote script registry.



955
956
957
# File 'lib/redis/distributed.rb', line 955

def script(subcommand, *args)
  on_each_node(:script, subcommand, *args)
end

#sdiff(*keys) ⇒ Object

Subtract multiple sets.



605
606
607
608
609
# File 'lib/redis/distributed.rb', line 605

def sdiff(*keys)
  ensure_same_node(:sdiff, keys) do |node|
    node.sdiff(*keys)
  end
end

#sdiffstore(destination, *keys) ⇒ Object

Subtract multiple sets and store the resulting set in a key.



612
613
614
615
616
# File 'lib/redis/distributed.rb', line 612

def sdiffstore(destination, *keys)
  ensure_same_node(:sdiffstore, [destination] + keys) do |node|
    node.sdiffstore(destination, *keys)
  end
end

#select(db) ⇒ Object

Change the selected database for the current connection.



48
49
50
# File 'lib/redis/distributed.rb', line 48

def select(db)
  on_each_node :select, db
end

#set(key, value, **options) ⇒ Object

Set the string value of a key.



280
281
282
# File 'lib/redis/distributed.rb', line 280

def set(key, value, **options)
  node_for(key).set(key, value, **options)
end

#setbit(key, offset, value) ⇒ Object

Sets or clears the bit at offset in the string value stored at key.



355
356
357
# File 'lib/redis/distributed.rb', line 355

def setbit(key, offset, value)
  node_for(key).setbit(key, offset, value)
end

#setex(key, ttl, value) ⇒ Object

Set the time to live in seconds of a key.



285
286
287
# File 'lib/redis/distributed.rb', line 285

def setex(key, ttl, value)
  node_for(key).setex(key, ttl, value)
end

#setnx(key, value) ⇒ Object

Set the value of a key, only if the key does not exist.



295
296
297
# File 'lib/redis/distributed.rb', line 295

def setnx(key, value)
  node_for(key).setnx(key, value)
end

#setrange(key, offset, value) ⇒ Object

Overwrite part of a string at key starting at the specified offset.



345
346
347
# File 'lib/redis/distributed.rb', line 345

def setrange(key, offset, value)
  node_for(key).setrange(key, offset, value)
end

#sinter(*keys) ⇒ Object

Intersect multiple sets.



619
620
621
622
623
# File 'lib/redis/distributed.rb', line 619

def sinter(*keys)
  ensure_same_node(:sinter, keys) do |node|
    node.sinter(*keys)
  end
end

#sinterstore(destination, *keys) ⇒ Object

Intersect multiple sets and store the resulting set in a key.



626
627
628
629
630
# File 'lib/redis/distributed.rb', line 626

def sinterstore(destination, *keys)
  ensure_same_node(:sinterstore, [destination] + keys) do |node|
    node.sinterstore(destination, *keys)
  end
end

#sismember(key, member) ⇒ Object

Determine if a given value is a member of a set.



580
581
582
# File 'lib/redis/distributed.rb', line 580

def sismember(key, member)
  node_for(key).sismember(key, member)
end

#smembers(key) ⇒ Object

Get all the members in a set.



590
591
592
# File 'lib/redis/distributed.rb', line 590

def smembers(key)
  node_for(key).smembers(key)
end

#smismember(key, *members) ⇒ Object

Determine if multiple values are members of a set.



585
586
587
# File 'lib/redis/distributed.rb', line 585

def smismember(key, *members)
  node_for(key).smismember(key, *members)
end

#smove(source, destination, member) ⇒ Object

Move a member from one set to another.



573
574
575
576
577
# File 'lib/redis/distributed.rb', line 573

def smove(source, destination, member)
  ensure_same_node(:smove, [source, destination]) do |node|
    node.smove(source, destination, member)
  end
end

#sort(key, **options) ⇒ Object

Sort the elements in a list, set or sorted set.



241
242
243
244
245
246
247
# File 'lib/redis/distributed.rb', line 241

def sort(key, **options)
  keys = [key, options[:by], options[:store], *Array(options[:get])].compact

  ensure_same_node(:sort, keys) do |node|
    node.sort(key, **options)
  end
end

#spop(key, count = nil) ⇒ Object

Remove and return a random member from a set.



563
564
565
# File 'lib/redis/distributed.rb', line 563

def spop(key, count = nil)
  node_for(key).spop(key, count)
end

#srandmember(key, count = nil) ⇒ Object

Get a random member from a set.



568
569
570
# File 'lib/redis/distributed.rb', line 568

def srandmember(key, count = nil)
  node_for(key).srandmember(key, count)
end

#srem(key, member) ⇒ Object

Remove one or more members from a set.



553
554
555
# File 'lib/redis/distributed.rb', line 553

def srem(key, member)
  node_for(key).srem(key, member)
end

#srem?(key, member) ⇒ Boolean

Remove one or more members from a set.



558
559
560
# File 'lib/redis/distributed.rb', line 558

def srem?(key, member)
  node_for(key).srem?(key, member)
end

#sscan(key, cursor, **options) ⇒ Object

Scan a set



595
596
597
# File 'lib/redis/distributed.rb', line 595

def sscan(key, cursor, **options)
  node_for(key).sscan(key, cursor, **options)
end

#sscan_each(key, **options, &block) ⇒ Object

Scan a set and return an enumerator



600
601
602
# File 'lib/redis/distributed.rb', line 600

def sscan_each(key, **options, &block)
  node_for(key).sscan_each(key, **options, &block)
end

#strlen(key) ⇒ Object

Get the length of the value stored in a key.



392
393
394
# File 'lib/redis/distributed.rb', line 392

def strlen(key)
  node_for(key).strlen(key)
end

#subscribe(channel, *channels, &block) ⇒ Object

Listen for messages published to the given channels.



870
871
872
873
874
875
876
877
878
879
880
# File 'lib/redis/distributed.rb', line 870

def subscribe(channel, *channels, &block)
  if channels.empty?
    @subscribed_node = node_for(channel)
    @subscribed_node.subscribe(channel, &block)
  else
    ensure_same_node(:subscribe, [channel] + channels) do |node|
      @subscribed_node = node
      node.subscribe(channel, *channels, &block)
    end
  end
end

#subscribed?Boolean



865
866
867
# File 'lib/redis/distributed.rb', line 865

def subscribed?
  !!@subscribed_node
end

#sunion(*keys) ⇒ Object

Add multiple sets.



633
634
635
636
637
# File 'lib/redis/distributed.rb', line 633

def sunion(*keys)
  ensure_same_node(:sunion, keys) do |node|
    node.sunion(*keys)
  end
end

#sunionstore(destination, *keys) ⇒ Object

Add multiple sets and store the resulting set in a key.



640
641
642
643
644
# File 'lib/redis/distributed.rb', line 640

def sunionstore(destination, *keys)
  ensure_same_node(:sunionstore, [destination] + keys) do |node|
    node.sunionstore(destination, *keys)
  end
end

#timeObject

Get server time: an UNIX timestamp and the elapsed microseconds in the current second.



108
109
110
# File 'lib/redis/distributed.rb', line 108

def time
  on_each_node :time
end

#ttl(key) ⇒ Object

Get the time to live (in seconds) for a key.



128
129
130
# File 'lib/redis/distributed.rb', line 128

def ttl(key)
  node_for(key).ttl(key)
end

#type(key) ⇒ Object

Determine the type stored at key.



250
251
252
# File 'lib/redis/distributed.rb', line 250

def type(key)
  node_for(key).type(key)
end

Unlink keys.



171
172
173
174
175
176
# File 'lib/redis/distributed.rb', line 171

def unlink(*args)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.unlink(*keys)
  end
end

#unsubscribe(*channels) ⇒ Object

Stop listening for messages posted to the given channels.



883
884
885
886
887
# File 'lib/redis/distributed.rb', line 883

def unsubscribe(*channels)
  raise "Can't unsubscribe if not subscribed." unless subscribed?

  @subscribed_node.unsubscribe(*channels)
end

#unwatchObject

Forget about all watched keys.

Raises:



915
916
917
918
919
920
921
# File 'lib/redis/distributed.rb', line 915

def unwatch
  raise CannotDistribute, :unwatch unless @watch_key

  result = node_for(@watch_key).unwatch
  @watch_key = nil
  result
end

#watch(*keys, &block) ⇒ Object

Watch the given keys to determine execution of the MULTI/EXEC block.



901
902
903
904
905
906
907
908
909
910
911
912
# File 'lib/redis/distributed.rb', line 901

def watch(*keys, &block)
  ensure_same_node(:watch, keys) do |node|
    @watch_key = key_tag(keys.first) || keys.first.to_s

    begin
      node.watch(*keys, &block)
    rescue StandardError
      @watch_key = nil
      raise
    end
  end
end

#zadd(key, *args) ⇒ Object

Add one or more members to a sorted set, or update the score for members that already exist.



653
654
655
# File 'lib/redis/distributed.rb', line 653

def zadd(key, *args)
  node_for(key).zadd(key, *args)
end

#zcard(key) ⇒ Object

Get the number of members in a sorted set.



647
648
649
# File 'lib/redis/distributed.rb', line 647

def zcard(key)
  node_for(key).zcard(key)
end

#zcount(key, min, max) ⇒ Object

Get the number of members in a particular score range.



735
736
737
# File 'lib/redis/distributed.rb', line 735

def zcount(key, min, max)
  node_for(key).zcount(key, min, max)
end

#zdiff(*keys, **options) ⇒ Object

Return the difference between the first and all successive input sorted sets.



769
770
771
772
773
# File 'lib/redis/distributed.rb', line 769

def zdiff(*keys, **options)
  ensure_same_node(:zdiff, keys) do |node|
    node.zdiff(*keys, **options)
  end
end

#zdiffstore(destination, keys, **options) ⇒ Object

Compute the difference between the first and all successive input sorted sets and store the resulting sorted set in a new key.



777
778
779
780
781
# File 'lib/redis/distributed.rb', line 777

def zdiffstore(destination, keys, **options)
  ensure_same_node(:zdiffstore, [destination] + keys) do |node|
    node.zdiffstore(destination, keys, **options)
  end
end

#zincrby(key, increment, member) ⇒ Object

Increment the score of a member in a sorted set.



659
660
661
# File 'lib/redis/distributed.rb', line 659

def zincrby(key, increment, member)
  node_for(key).zincrby(key, increment, member)
end

#zinter(*keys, **options) ⇒ Object

Get the intersection of multiple sorted sets



740
741
742
743
744
# File 'lib/redis/distributed.rb', line 740

def zinter(*keys, **options)
  ensure_same_node(:zinter, keys) do |node|
    node.zinter(*keys, **options)
  end
end

#zinterstore(destination, keys, **options) ⇒ Object

Intersect multiple sorted sets and store the resulting sorted set in a new key.



748
749
750
751
752
# File 'lib/redis/distributed.rb', line 748

def zinterstore(destination, keys, **options)
  ensure_same_node(:zinterstore, [destination] + keys) do |node|
    node.zinterstore(destination, keys, **options)
  end
end

#zmscore(key, *members) ⇒ Object

Get the scores associated with the given members in a sorted set.



679
680
681
# File 'lib/redis/distributed.rb', line 679

def zmscore(key, *members)
  node_for(key).zmscore(key, *members)
end

#zrandmember(key, count = nil, **options) ⇒ Object

Get one or more random members from a sorted set.



674
675
676
# File 'lib/redis/distributed.rb', line 674

def zrandmember(key, count = nil, **options)
  node_for(key).zrandmember(key, count, **options)
end

#zrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index, score or lexicographical ordering.



684
685
686
# File 'lib/redis/distributed.rb', line 684

def zrange(key, start, stop, **options)
  node_for(key).zrange(key, start, stop, **options)
end

#zrangebyscore(key, min, max, **options) ⇒ Object

Return a range of members in a sorted set, by score.



719
720
721
# File 'lib/redis/distributed.rb', line 719

def zrangebyscore(key, min, max, **options)
  node_for(key).zrangebyscore(key, min, max, **options)
end

#zrangestore(dest_key, src_key, start, stop, **options) ⇒ Object

Select a range of members in a sorted set, by index, score or lexicographical ordering and store the resulting sorted set in a new key.



690
691
692
693
694
# File 'lib/redis/distributed.rb', line 690

def zrangestore(dest_key, src_key, start, stop, **options)
  ensure_same_node(:zrangestore, [dest_key, src_key]) do |node|
    node.zrangestore(dest_key, src_key, start, stop, **options)
  end
end

#zrank(key, member) ⇒ Object

Determine the index of a member in a sorted set.



703
704
705
# File 'lib/redis/distributed.rb', line 703

def zrank(key, member)
  node_for(key).zrank(key, member)
end

#zrem(key, member) ⇒ Object

Remove one or more members from a sorted set.



664
665
666
# File 'lib/redis/distributed.rb', line 664

def zrem(key, member)
  node_for(key).zrem(key, member)
end

#zremrangebyrank(key, start, stop) ⇒ Object

Remove all members in a sorted set within the given indexes.



714
715
716
# File 'lib/redis/distributed.rb', line 714

def zremrangebyrank(key, start, stop)
  node_for(key).zremrangebyrank(key, start, stop)
end

#zremrangebyscore(key, min, max) ⇒ Object

Remove all members in a sorted set within the given scores.



730
731
732
# File 'lib/redis/distributed.rb', line 730

def zremrangebyscore(key, min, max)
  node_for(key).zremrangebyscore(key, min, max)
end

#zrevrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index, with scores ordered from high to low.



698
699
700
# File 'lib/redis/distributed.rb', line 698

def zrevrange(key, start, stop, **options)
  node_for(key).zrevrange(key, start, stop, **options)
end

#zrevrangebyscore(key, max, min, **options) ⇒ Object

Return a range of members in a sorted set, by score, with scores ordered from high to low.



725
726
727
# File 'lib/redis/distributed.rb', line 725

def zrevrangebyscore(key, max, min, **options)
  node_for(key).zrevrangebyscore(key, max, min, **options)
end

#zrevrank(key, member) ⇒ Object

Determine the index of a member in a sorted set, with scores ordered from high to low.



709
710
711
# File 'lib/redis/distributed.rb', line 709

def zrevrank(key, member)
  node_for(key).zrevrank(key, member)
end

#zscore(key, member) ⇒ Object

Get the score associated with the given member in a sorted set.



669
670
671
# File 'lib/redis/distributed.rb', line 669

def zscore(key, member)
  node_for(key).zscore(key, member)
end

#zunion(*keys, **options) ⇒ Object

Return the union of multiple sorted sets.



755
756
757
758
759
# File 'lib/redis/distributed.rb', line 755

def zunion(*keys, **options)
  ensure_same_node(:zunion, keys) do |node|
    node.zunion(*keys, **options)
  end
end

#zunionstore(destination, keys, **options) ⇒ Object

Add multiple sorted sets and store the resulting sorted set in a new key.



762
763
764
765
766
# File 'lib/redis/distributed.rb', line 762

def zunionstore(destination, keys, **options)
  ensure_same_node(:zunionstore, [destination] + keys) do |node|
    node.zunionstore(destination, keys, **options)
  end
end