Class: Redis::Distributed

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/distributed.rb

Defined Under Namespace

Classes: CannotDistribute

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node_configs, options = {}) ⇒ Distributed

Returns a new instance of Distributed.



20
21
22
23
24
25
26
27
28
# File 'lib/redis/distributed.rb', line 20

def initialize(node_configs, options = {})
  @tag = options[:tag] || /^\{(.+?)\}/
  @ring = options[:ring] || HashRing.new
  @node_configs = node_configs.map(&:dup)
  @default_options = options.dup
  node_configs.each { |node_config| add_node(node_config) }
  @subscribed_node = nil
  @watch_key = nil
end

Instance Attribute Details

#ringObject (readonly)

Returns the value of attribute ring.



18
19
20
# File 'lib/redis/distributed.rb', line 18

def ring
  @ring
end

Instance Method Details

#[](key) ⇒ Object



400
401
402
# File 'lib/redis/distributed.rb', line 400

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



404
405
406
# File 'lib/redis/distributed.rb', line 404

def []=(key, value)
  set(key, value)
end

#_bpop(cmd, args) ⇒ Object



466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'lib/redis/distributed.rb', line 466

def _bpop(cmd, args)
  timeout = if args.last.is_a?(Hash)
    options = args.pop
    options[:timeout]
  end

  args.flatten!(1)

  ensure_same_node(cmd, args) do |node|
    if timeout
      node.__send__(cmd, args, timeout: timeout)
    else
      node.__send__(cmd, args)
    end
  end
end

#_eval(cmd, args) ⇒ Object



1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
# File 'lib/redis/distributed.rb', line 1000

def _eval(cmd, args)
  script = args.shift
  options = args.pop if args.last.is_a?(Hash)
  options ||= {}

  keys = args.shift || options[:keys] || []
  argv = args.shift || options[:argv] || []

  ensure_same_node(cmd, keys) do |node|
    node.send(cmd, script, keys, argv)
  end
end

#add_node(options) ⇒ Object



41
42
43
44
45
46
47
# File 'lib/redis/distributed.rb', line 41

def add_node(options)
  options = { url: options } if options.is_a?(String)
  options = @default_options.merge(options)
  options.delete(:tag)
  options.delete(:ring)
  @ring.add_node Redis.new(options)
end

#append(key, value) ⇒ Object

Append a value to a key.



368
369
370
# File 'lib/redis/distributed.rb', line 368

def append(key, value)
  node_for(key).append(key, value)
end

#bgsaveObject

Asynchronously save the dataset to disk.



74
75
76
# File 'lib/redis/distributed.rb', line 74

def bgsave
  on_each_node :bgsave
end

#bitcount(key, start = 0, stop = -1)) ⇒ Object

Count the number of set bits in a range of the string value stored at key.



373
374
375
# File 'lib/redis/distributed.rb', line 373

def bitcount(key, start = 0, stop = -1)
  node_for(key).bitcount(key, start, stop)
end

#bitop(operation, destkey, *keys) ⇒ Object

Perform a bitwise operation between strings and store the resulting string in a key.



378
379
380
381
382
383
# File 'lib/redis/distributed.rb', line 378

def bitop(operation, destkey, *keys)
  keys.flatten!(1)
  ensure_same_node(:bitop, [destkey] + keys) do |node|
    node.bitop(operation, destkey, keys)
  end
end

#bitpos(key, bit, start = nil, stop = nil) ⇒ Object

Return the position of the first bit set to 1 or 0 in a string.



386
387
388
# File 'lib/redis/distributed.rb', line 386

def bitpos(key, bit, start = nil, stop = nil)
  node_for(key).bitpos(key, bit, start, stop)
end

#blmove(source, destination, where_source, where_destination, timeout: 0) ⇒ Object

Remove the first/last element in a list and append/prepend it to another list and return it, or block until one is available.



422
423
424
425
426
# File 'lib/redis/distributed.rb', line 422

def blmove(source, destination, where_source, where_destination, timeout: 0)
  ensure_same_node(:lmove, [source, destination]) do |node|
    node.blmove(source, destination, where_source, where_destination, timeout: timeout)
  end
end

#blpop(*args) ⇒ Object

Remove and get the first element in a list, or block until one is available.



485
486
487
# File 'lib/redis/distributed.rb', line 485

def blpop(*args)
  _bpop(:blpop, args)
end

#brpop(*args) ⇒ Object

Remove and get the last element in a list, or block until one is available.



503
504
505
# File 'lib/redis/distributed.rb', line 503

def brpop(*args)
  _bpop(:brpop, args)
end

#brpoplpush(source, destination, **options) ⇒ Object

Pop a value from a list, push it to another list and return it; or block until one is available.



509
510
511
512
513
# File 'lib/redis/distributed.rb', line 509

def brpoplpush(source, destination, **options)
  ensure_same_node(:brpoplpush, [source, destination]) do |node|
    node.brpoplpush(source, destination, **options)
  end
end

#bzpopmax(*args) ⇒ Object



489
490
491
492
493
# File 'lib/redis/distributed.rb', line 489

def bzpopmax(*args)
  _bpop(:bzpopmax, args) do |reply|
    reply.is_a?(Array) ? [reply[0], reply[1], Floatify.call(reply[2])] : reply
  end
end

#bzpopmin(*args) ⇒ Object



495
496
497
498
499
# File 'lib/redis/distributed.rb', line 495

def bzpopmin(*args)
  _bpop(:bzpopmin, args) do |reply|
    reply.is_a?(Array) ? [reply[0], reply[1], Floatify.call(reply[2])] : reply
  end
end

#closeObject



69
70
71
# File 'lib/redis/distributed.rb', line 69

def close
  on_each_node :close
end

#copy(source, destination, **options) ⇒ Object

Copy a value from one key to another.



216
217
218
219
220
# File 'lib/redis/distributed.rb', line 216

def copy(source, destination, **options)
  ensure_same_node(:copy, [source, destination]) do |node|
    node.copy(source, destination, **options)
  end
end

#dbsizeObject

Return the number of keys in the selected database.



79
80
81
# File 'lib/redis/distributed.rb', line 79

def dbsize
  on_each_node :dbsize
end

#decr(key) ⇒ Object

Decrement the integer value of a key by one.



256
257
258
# File 'lib/redis/distributed.rb', line 256

def decr(key)
  node_for(key).decr(key)
end

#decrby(key, decrement) ⇒ Object

Decrement the integer value of a key by the given number.



261
262
263
# File 'lib/redis/distributed.rb', line 261

def decrby(key, decrement)
  node_for(key).decrby(key, decrement)
end

#del(*args) ⇒ Object

Delete a key.



169
170
171
172
173
174
175
# File 'lib/redis/distributed.rb', line 169

def del(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.del(*keys)
  end
end

#discardObject

Discard all commands issued after MULTI.

Raises:



967
968
969
970
971
972
973
# File 'lib/redis/distributed.rb', line 967

def discard
  raise CannotDistribute, :discard unless @watch_key

  result = node_for(@watch_key).discard
  @watch_key = nil
  result
end

#dump(key) ⇒ Object

Return a serialized version of the value stored at a key.



154
155
156
# File 'lib/redis/distributed.rb', line 154

def dump(key)
  node_for(key).dump(key)
end

#dupObject



1027
1028
1029
# File 'lib/redis/distributed.rb', line 1027

def dup
  self.class.new(@node_configs, @default_options)
end

#echo(value) ⇒ Object

Echo the given string.



60
61
62
# File 'lib/redis/distributed.rb', line 60

def echo(value)
  on_each_node :echo, value
end

#eval(*args) ⇒ Object

Evaluate Lua script.



1014
1015
1016
# File 'lib/redis/distributed.rb', line 1014

def eval(*args)
  _eval(:eval, args)
end

#evalsha(*args) ⇒ Object

Evaluate Lua script by its SHA.



1019
1020
1021
# File 'lib/redis/distributed.rb', line 1019

def evalsha(*args)
  _eval(:evalsha, args)
end

#execObject

Execute all commands issued after MULTI.

Raises:



958
959
960
961
962
963
964
# File 'lib/redis/distributed.rb', line 958

def exec
  raise CannotDistribute, :exec unless @watch_key

  result = node_for(@watch_key).exec
  @watch_key = nil
  result
end

#exists(*args) ⇒ Object

Determine if a key exists.



187
188
189
190
191
192
193
# File 'lib/redis/distributed.rb', line 187

def exists(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.exists(*keys)
  end
end

#exists?(*args) ⇒ Boolean

Determine if any of the keys exists.

Returns:

  • (Boolean)


196
197
198
199
200
201
202
203
# File 'lib/redis/distributed.rb', line 196

def exists?(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.each do |node, keys|
    return true if node.exists?(*keys)
  end
  false
end

#expire(key, seconds, **kwargs) ⇒ Object

Set a key’s time to live in seconds.



124
125
126
# File 'lib/redis/distributed.rb', line 124

def expire(key, seconds, **kwargs)
  node_for(key).expire(key, seconds, **kwargs)
end

#expireat(key, unix_time, **kwargs) ⇒ Object

Set the expiration for a key as a UNIX timestamp.



129
130
131
# File 'lib/redis/distributed.rb', line 129

def expireat(key, unix_time, **kwargs)
  node_for(key).expireat(key, unix_time, **kwargs)
end

#flushallObject

Remove all keys from all databases.



84
85
86
# File 'lib/redis/distributed.rb', line 84

def flushall
  on_each_node :flushall
end

#flushdbObject

Remove all keys from the current database.



89
90
91
# File 'lib/redis/distributed.rb', line 89

def flushdb
  on_each_node :flushdb
end

#get(key) ⇒ Object

Get the value of a key.



319
320
321
# File 'lib/redis/distributed.rb', line 319

def get(key)
  node_for(key).get(key)
end

#getbit(key, offset) ⇒ Object

Returns the bit value at offset in the string value stored at key.



363
364
365
# File 'lib/redis/distributed.rb', line 363

def getbit(key, offset)
  node_for(key).getbit(key, offset)
end

#getdel(key) ⇒ Object

Get the value of a key and delete it.



324
325
326
# File 'lib/redis/distributed.rb', line 324

def getdel(key)
  node_for(key).getdel(key)
end

#getex(key, **options) ⇒ Object

Get the value of a key and sets its time to live based on options.



329
330
331
# File 'lib/redis/distributed.rb', line 329

def getex(key, **options)
  node_for(key).getex(key, **options)
end

#getrange(key, start, stop) ⇒ Object

Get a substring of the string stored at a key.



353
354
355
# File 'lib/redis/distributed.rb', line 353

def getrange(key, start, stop)
  node_for(key).getrange(key, start, stop)
end

#getset(key, value) ⇒ Object

Set the string value of a key and return its old value.



391
392
393
# File 'lib/redis/distributed.rb', line 391

def getset(key, value)
  node_for(key).getset(key, value)
end

#hdel(key, *fields) ⇒ Object

Delete one or more hash fields.



848
849
850
851
# File 'lib/redis/distributed.rb', line 848

def hdel(key, *fields)
  fields.flatten!(1)
  node_for(key).hdel(key, fields)
end

#hexists(key, field) ⇒ Object

Determine if a hash field exists.



854
855
856
# File 'lib/redis/distributed.rb', line 854

def hexists(key, field)
  node_for(key).hexists(key, field)
end

#hget(key, field) ⇒ Object

Get the value of a hash field.



828
829
830
# File 'lib/redis/distributed.rb', line 828

def hget(key, field)
  node_for(key).hget(key, field)
end

#hgetall(key) ⇒ Object

Get all the fields and values in a hash.



879
880
881
# File 'lib/redis/distributed.rb', line 879

def hgetall(key)
  node_for(key).hgetall(key)
end

#hincrby(key, field, increment) ⇒ Object

Increment the integer value of a hash field by the given integer number.



859
860
861
# File 'lib/redis/distributed.rb', line 859

def hincrby(key, field, increment)
  node_for(key).hincrby(key, field, increment)
end

#hincrbyfloat(key, field, increment) ⇒ Object

Increment the numeric value of a hash field by the given float number.



864
865
866
# File 'lib/redis/distributed.rb', line 864

def hincrbyfloat(key, field, increment)
  node_for(key).hincrbyfloat(key, field, increment)
end

#hkeys(key) ⇒ Object

Get all the fields in a hash.



869
870
871
# File 'lib/redis/distributed.rb', line 869

def hkeys(key)
  node_for(key).hkeys(key)
end

#hlen(key) ⇒ Object

Get the number of fields in a hash.



804
805
806
# File 'lib/redis/distributed.rb', line 804

def hlen(key)
  node_for(key).hlen(key)
end

#hmget(key, *fields) ⇒ Object

Get the values of all the given hash fields.



833
834
835
836
# File 'lib/redis/distributed.rb', line 833

def hmget(key, *fields)
  fields.flatten!(1)
  node_for(key).hmget(key, fields)
end

#hmset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



819
820
821
# File 'lib/redis/distributed.rb', line 819

def hmset(key, *attrs)
  node_for(key).hmset(key, *attrs)
end

#hrandfield(key, count = nil, **options) ⇒ Object



843
844
845
# File 'lib/redis/distributed.rb', line 843

def hrandfield(key, count = nil, **options)
  node_for(key).hrandfield(key, count, **options)
end

#hset(key, *attrs) ⇒ Object

Set multiple hash fields to multiple values.



809
810
811
# File 'lib/redis/distributed.rb', line 809

def hset(key, *attrs)
  node_for(key).hset(key, *attrs)
end

#hsetnx(key, field, value) ⇒ Object

Set the value of a hash field, only if the field does not exist.



814
815
816
# File 'lib/redis/distributed.rb', line 814

def hsetnx(key, field, value)
  node_for(key).hsetnx(key, field, value)
end

#hvals(key) ⇒ Object

Get all the values in a hash.



874
875
876
# File 'lib/redis/distributed.rb', line 874

def hvals(key)
  node_for(key).hvals(key)
end

#incr(key) ⇒ Object

Increment the integer value of a key by one.



266
267
268
# File 'lib/redis/distributed.rb', line 266

def incr(key)
  node_for(key).incr(key)
end

#incrby(key, increment) ⇒ Object

Increment the integer value of a key by the given integer number.



271
272
273
# File 'lib/redis/distributed.rb', line 271

def incrby(key, increment)
  node_for(key).incrby(key, increment)
end

#incrbyfloat(key, increment) ⇒ Object

Increment the numeric value of a key by the given float number.



276
277
278
# File 'lib/redis/distributed.rb', line 276

def incrbyfloat(key, increment)
  node_for(key).incrbyfloat(key, increment)
end

#info(cmd = nil) ⇒ Object

Get information and statistics about the server.



94
95
96
# File 'lib/redis/distributed.rb', line 94

def info(cmd = nil)
  on_each_node :info, cmd
end

#inspectObject



1023
1024
1025
# File 'lib/redis/distributed.rb', line 1023

def inspect
  "#<Redis client v#{Redis::VERSION} for #{nodes.map(&:id).join(', ')}>"
end

#keys(glob = "*") ⇒ Object

Find all keys matching the given pattern.



206
207
208
# File 'lib/redis/distributed.rb', line 206

def keys(glob = "*")
  on_each_node(:keys, glob).flatten
end

#lastsaveObject

Get the UNIX time stamp of the last successful save to disk.



99
100
101
# File 'lib/redis/distributed.rb', line 99

def lastsave
  on_each_node :lastsave
end

#lindex(key, index) ⇒ Object

Get an element from a list by its index.



516
517
518
# File 'lib/redis/distributed.rb', line 516

def lindex(key, index)
  node_for(key).lindex(key, index)
end

#linsert(key, where, pivot, value) ⇒ Object

Insert an element before or after another element in a list.



521
522
523
# File 'lib/redis/distributed.rb', line 521

def linsert(key, where, pivot, value)
  node_for(key).linsert(key, where, pivot, value)
end

#llen(key) ⇒ Object

Get the length of a list.



409
410
411
# File 'lib/redis/distributed.rb', line 409

def llen(key)
  node_for(key).llen(key)
end

#lmove(source, destination, where_source, where_destination) ⇒ Object

Remove the first/last element in a list, append/prepend it to another list and return it.



414
415
416
417
418
# File 'lib/redis/distributed.rb', line 414

def lmove(source, destination, where_source, where_destination)
  ensure_same_node(:lmove, [source, destination]) do |node|
    node.lmove(source, destination, where_source, where_destination)
  end
end

#lpop(key, count = nil) ⇒ Object

Remove and get the first elements in a list.



449
450
451
# File 'lib/redis/distributed.rb', line 449

def lpop(key, count = nil)
  node_for(key).lpop(key, count)
end

#lpush(key, value) ⇒ Object

Prepend one or more values to a list.



429
430
431
# File 'lib/redis/distributed.rb', line 429

def lpush(key, value)
  node_for(key).lpush(key, value)
end

#lpushx(key, value) ⇒ Object

Prepend a value to a list, only if the list exists.



434
435
436
# File 'lib/redis/distributed.rb', line 434

def lpushx(key, value)
  node_for(key).lpushx(key, value)
end

#lrange(key, start, stop) ⇒ Object

Get a range of elements from a list.



526
527
528
# File 'lib/redis/distributed.rb', line 526

def lrange(key, start, stop)
  node_for(key).lrange(key, start, stop)
end

#lrem(key, count, value) ⇒ Object

Remove elements from a list.



531
532
533
# File 'lib/redis/distributed.rb', line 531

def lrem(key, count, value)
  node_for(key).lrem(key, count, value)
end

#lset(key, index, value) ⇒ Object

Set the value of an element in a list by its index.



536
537
538
# File 'lib/redis/distributed.rb', line 536

def lset(key, index, value)
  node_for(key).lset(key, index, value)
end

#ltrim(key, start, stop) ⇒ Object

Trim a list to the specified range.



541
542
543
# File 'lib/redis/distributed.rb', line 541

def ltrim(key, start, stop)
  node_for(key).ltrim(key, start, stop)
end

#mapped_hmget(key, *fields) ⇒ Object



838
839
840
841
# File 'lib/redis/distributed.rb', line 838

def mapped_hmget(key, *fields)
  fields.flatten!(1)
  node_for(key).mapped_hmget(key, fields)
end

#mapped_hmset(key, hash) ⇒ Object



823
824
825
# File 'lib/redis/distributed.rb', line 823

def mapped_hmset(key, hash)
  node_for(key).hmset(key, hash)
end

#mapped_mget(*keys) ⇒ Object

Get the values of all the given keys as a Hash.



340
341
342
343
344
345
# File 'lib/redis/distributed.rb', line 340

def mapped_mget(*keys)
  keys.flatten!(1)
  keys.group_by { |k| node_for k }.inject({}) do |results, (node, subkeys)|
    results.merge! node.mapped_mget(*subkeys)
  end
end

#mapped_mset(_hash) ⇒ Object

Raises:



305
306
307
# File 'lib/redis/distributed.rb', line 305

def mapped_mset(_hash)
  raise CannotDistribute, :mapped_mset
end

#mapped_msetnx(_hash) ⇒ Object

Raises:



314
315
316
# File 'lib/redis/distributed.rb', line 314

def mapped_msetnx(_hash)
  raise CannotDistribute, :mapped_msetnx
end

#mget(*keys) ⇒ Object

Get the values of all the given keys as an Array.



334
335
336
337
# File 'lib/redis/distributed.rb', line 334

def mget(*keys)
  keys.flatten!(1)
  mapped_mget(*keys).values_at(*keys)
end

#migrate(_key, _options) ⇒ Object

Transfer a key from the connected instance to another instance.

Raises:



164
165
166
# File 'lib/redis/distributed.rb', line 164

def migrate(_key, _options)
  raise CannotDistribute, :migrate
end

#monitorObject

Listen for all requests received by the server in real time.

Raises:

  • (NotImplementedError)


104
105
106
# File 'lib/redis/distributed.rb', line 104

def monitor
  raise NotImplementedError
end

#move(key, db) ⇒ Object

Move a key to another database.



211
212
213
# File 'lib/redis/distributed.rb', line 211

def move(key, db)
  node_for(key).move(key, db)
end

#msetObject

Set multiple keys to multiple values.

Raises:



301
302
303
# File 'lib/redis/distributed.rb', line 301

def mset(*)
  raise CannotDistribute, :mset
end

#msetnxObject

Set multiple keys to multiple values, only if none of the keys exist.

Raises:



310
311
312
# File 'lib/redis/distributed.rb', line 310

def msetnx(*)
  raise CannotDistribute, :msetnx
end

#multi(&block) ⇒ Object

Mark the start of a transaction block.

Raises:



951
952
953
954
955
# File 'lib/redis/distributed.rb', line 951

def multi(&block)
  raise CannotDistribute, :multi unless @watch_key

  node_for(@watch_key).multi(&block)
end

#node_for(key) ⇒ Object

Raises:



30
31
32
33
34
35
# File 'lib/redis/distributed.rb', line 30

def node_for(key)
  key = key_tag(key.to_s) || key.to_s
  raise CannotDistribute, :watch if @watch_key && @watch_key != key

  @ring.get_node(key)
end

#nodesObject



37
38
39
# File 'lib/redis/distributed.rb', line 37

def nodes
  @ring.nodes
end

#persist(key) ⇒ Object

Remove the expiration from a key.



119
120
121
# File 'lib/redis/distributed.rb', line 119

def persist(key)
  node_for(key).persist(key)
end

#pexpire(key, milliseconds, **kwarg) ⇒ Object

Set a key’s time to live in milliseconds.



139
140
141
# File 'lib/redis/distributed.rb', line 139

def pexpire(key, milliseconds, **kwarg)
  node_for(key).pexpire(key, milliseconds, **kwarg)
end

#pexpireat(key, ms_unix_time, **kwarg) ⇒ Object

Set the expiration for a key as number of milliseconds from UNIX Epoch.



144
145
146
# File 'lib/redis/distributed.rb', line 144

def pexpireat(key, ms_unix_time, **kwarg)
  node_for(key).pexpireat(key, ms_unix_time, **kwarg)
end

#pfadd(key, member) ⇒ Object

Add one or more members to a HyperLogLog structure.



981
982
983
# File 'lib/redis/distributed.rb', line 981

def pfadd(key, member)
  node_for(key).pfadd(key, member)
end

#pfcount(*keys) ⇒ Object

Get the approximate cardinality of members added to HyperLogLog structure.



986
987
988
989
990
# File 'lib/redis/distributed.rb', line 986

def pfcount(*keys)
  ensure_same_node(:pfcount, keys.flatten(1)) do |node|
    node.pfcount(keys)
  end
end

#pfmerge(dest_key, *source_key) ⇒ Object

Merge multiple HyperLogLog values into an unique value that will approximate the cardinality of the union of the observed Sets of the source HyperLogLog structures.



994
995
996
997
998
# File 'lib/redis/distributed.rb', line 994

def pfmerge(dest_key, *source_key)
  ensure_same_node(:pfmerge, [dest_key, *source_key]) do |node|
    node.pfmerge(dest_key, *source_key)
  end
end

#pingObject

Ping the server.



55
56
57
# File 'lib/redis/distributed.rb', line 55

def ping
  on_each_node :ping
end

#pipelinedObject

Raises:



946
947
948
# File 'lib/redis/distributed.rb', line 946

def pipelined
  raise CannotDistribute, :pipelined
end

#psetex(key, ttl, value) ⇒ Object

Set the time to live in milliseconds of a key.



291
292
293
# File 'lib/redis/distributed.rb', line 291

def psetex(key, ttl, value)
  node_for(key).psetex(key, ttl, value)
end

#psubscribe(*channels, &block) ⇒ Object

Listen for messages published to channels matching the given patterns.

Raises:

  • (NotImplementedError)


913
914
915
# File 'lib/redis/distributed.rb', line 913

def psubscribe(*channels, &block)
  raise NotImplementedError
end

#pttl(key) ⇒ Object

Get the time to live (in milliseconds) for a key.



149
150
151
# File 'lib/redis/distributed.rb', line 149

def pttl(key)
  node_for(key).pttl(key)
end

#publish(channel, message) ⇒ Object

Post a message to a channel.



884
885
886
# File 'lib/redis/distributed.rb', line 884

def publish(channel, message)
  node_for(channel).publish(channel, message)
end

#punsubscribe(*channels) ⇒ Object

Stop listening for messages posted to channels matching the given patterns.

Raises:

  • (NotImplementedError)


919
920
921
# File 'lib/redis/distributed.rb', line 919

def punsubscribe(*channels)
  raise NotImplementedError
end

#quitObject

Close the connection.



65
66
67
# File 'lib/redis/distributed.rb', line 65

def quit
  on_each_node :quit
end

#randomkeyObject

Return a random key from the keyspace.

Raises:



223
224
225
# File 'lib/redis/distributed.rb', line 223

def randomkey
  raise CannotDistribute, :randomkey
end

#rename(old_name, new_name) ⇒ Object

Rename a key.



228
229
230
231
232
# File 'lib/redis/distributed.rb', line 228

def rename(old_name, new_name)
  ensure_same_node(:rename, [old_name, new_name]) do |node|
    node.rename(old_name, new_name)
  end
end

#renamenx(old_name, new_name) ⇒ Object

Rename a key, only if the new key does not exist.



235
236
237
238
239
# File 'lib/redis/distributed.rb', line 235

def renamenx(old_name, new_name)
  ensure_same_node(:renamenx, [old_name, new_name]) do |node|
    node.renamenx(old_name, new_name)
  end
end

#restore(key, ttl, serialized_value, **options) ⇒ Object

Create a key using the serialized value, previously obtained using DUMP.



159
160
161
# File 'lib/redis/distributed.rb', line 159

def restore(key, ttl, serialized_value, **options)
  node_for(key).restore(key, ttl, serialized_value, **options)
end

#rpop(key, count = nil) ⇒ Object

Remove and get the last elements in a list.



454
455
456
# File 'lib/redis/distributed.rb', line 454

def rpop(key, count = nil)
  node_for(key).rpop(key, count)
end

#rpoplpush(source, destination) ⇒ Object

Remove the last element in a list, append it to another list and return it.



460
461
462
463
464
# File 'lib/redis/distributed.rb', line 460

def rpoplpush(source, destination)
  ensure_same_node(:rpoplpush, [source, destination]) do |node|
    node.rpoplpush(source, destination)
  end
end

#rpush(key, value) ⇒ Object

Append one or more values to a list.



439
440
441
# File 'lib/redis/distributed.rb', line 439

def rpush(key, value)
  node_for(key).rpush(key, value)
end

#rpushx(key, value) ⇒ Object

Append a value to a list, only if the list exists.



444
445
446
# File 'lib/redis/distributed.rb', line 444

def rpushx(key, value)
  node_for(key).rpushx(key, value)
end

#sadd(key, *members) ⇒ Object

Add one or more members to a set.



551
552
553
# File 'lib/redis/distributed.rb', line 551

def sadd(key, *members)
  node_for(key).sadd(key, *members)
end

#sadd?(key, *members) ⇒ Boolean

Add one or more members to a set.

Returns:

  • (Boolean)


556
557
558
# File 'lib/redis/distributed.rb', line 556

def sadd?(key, *members)
  node_for(key).sadd?(key, *members)
end

#saveObject

Synchronously save the dataset to disk.



109
110
111
# File 'lib/redis/distributed.rb', line 109

def save
  on_each_node :save
end

#scard(key) ⇒ Object

Get the number of members in a set.



546
547
548
# File 'lib/redis/distributed.rb', line 546

def scard(key)
  node_for(key).scard(key)
end

#script(subcommand, *args) ⇒ Object

Control remote script registry.



976
977
978
# File 'lib/redis/distributed.rb', line 976

def script(subcommand, *args)
  on_each_node(:script, subcommand, *args)
end

#sdiff(*keys) ⇒ Object

Subtract multiple sets.



613
614
615
616
617
618
# File 'lib/redis/distributed.rb', line 613

def sdiff(*keys)
  keys.flatten!(1)
  ensure_same_node(:sdiff, keys) do |node|
    node.sdiff(keys)
  end
end

#sdiffstore(destination, *keys) ⇒ Object

Subtract multiple sets and store the resulting set in a key.



621
622
623
624
625
626
# File 'lib/redis/distributed.rb', line 621

def sdiffstore(destination, *keys)
  keys.flatten!(1)
  ensure_same_node(:sdiffstore, [destination].concat(keys)) do |node|
    node.sdiffstore(destination, keys)
  end
end

#select(db) ⇒ Object

Change the selected database for the current connection.



50
51
52
# File 'lib/redis/distributed.rb', line 50

def select(db)
  on_each_node :select, db
end

#set(key, value, **options) ⇒ Object

Set the string value of a key.



281
282
283
# File 'lib/redis/distributed.rb', line 281

def set(key, value, **options)
  node_for(key).set(key, value, **options)
end

#setbit(key, offset, value) ⇒ Object

Sets or clears the bit at offset in the string value stored at key.



358
359
360
# File 'lib/redis/distributed.rb', line 358

def setbit(key, offset, value)
  node_for(key).setbit(key, offset, value)
end

#setex(key, ttl, value) ⇒ Object

Set the time to live in seconds of a key.



286
287
288
# File 'lib/redis/distributed.rb', line 286

def setex(key, ttl, value)
  node_for(key).setex(key, ttl, value)
end

#setnx(key, value) ⇒ Object

Set the value of a key, only if the key does not exist.



296
297
298
# File 'lib/redis/distributed.rb', line 296

def setnx(key, value)
  node_for(key).setnx(key, value)
end

#setrange(key, offset, value) ⇒ Object

Overwrite part of a string at key starting at the specified offset.



348
349
350
# File 'lib/redis/distributed.rb', line 348

def setrange(key, offset, value)
  node_for(key).setrange(key, offset, value)
end

#sinter(*keys) ⇒ Object

Intersect multiple sets.



629
630
631
632
633
634
# File 'lib/redis/distributed.rb', line 629

def sinter(*keys)
  keys.flatten!(1)
  ensure_same_node(:sinter, keys) do |node|
    node.sinter(keys)
  end
end

#sinterstore(destination, *keys) ⇒ Object

Intersect multiple sets and store the resulting set in a key.



637
638
639
640
641
642
# File 'lib/redis/distributed.rb', line 637

def sinterstore(destination, *keys)
  keys.flatten!(1)
  ensure_same_node(:sinterstore, [destination].concat(keys)) do |node|
    node.sinterstore(destination, keys)
  end
end

#sismember(key, member) ⇒ Object

Determine if a given value is a member of a set.



588
589
590
# File 'lib/redis/distributed.rb', line 588

def sismember(key, member)
  node_for(key).sismember(key, member)
end

#smembers(key) ⇒ Object

Get all the members in a set.



598
599
600
# File 'lib/redis/distributed.rb', line 598

def smembers(key)
  node_for(key).smembers(key)
end

#smismember(key, *members) ⇒ Object

Determine if multiple values are members of a set.



593
594
595
# File 'lib/redis/distributed.rb', line 593

def smismember(key, *members)
  node_for(key).smismember(key, *members)
end

#smove(source, destination, member) ⇒ Object

Move a member from one set to another.



581
582
583
584
585
# File 'lib/redis/distributed.rb', line 581

def smove(source, destination, member)
  ensure_same_node(:smove, [source, destination]) do |node|
    node.smove(source, destination, member)
  end
end

#sort(key, **options) ⇒ Object

Sort the elements in a list, set or sorted set.



242
243
244
245
246
247
248
# File 'lib/redis/distributed.rb', line 242

def sort(key, **options)
  keys = [key, options[:by], options[:store], *Array(options[:get])].compact

  ensure_same_node(:sort, keys) do |node|
    node.sort(key, **options)
  end
end

#spop(key, count = nil) ⇒ Object

Remove and return a random member from a set.



571
572
573
# File 'lib/redis/distributed.rb', line 571

def spop(key, count = nil)
  node_for(key).spop(key, count)
end

#srandmember(key, count = nil) ⇒ Object

Get a random member from a set.



576
577
578
# File 'lib/redis/distributed.rb', line 576

def srandmember(key, count = nil)
  node_for(key).srandmember(key, count)
end

#srem(key, *members) ⇒ Object

Remove one or more members from a set.



561
562
563
# File 'lib/redis/distributed.rb', line 561

def srem(key, *members)
  node_for(key).srem(key, *members)
end

#srem?(key, *members) ⇒ Boolean

Remove one or more members from a set.

Returns:

  • (Boolean)


566
567
568
# File 'lib/redis/distributed.rb', line 566

def srem?(key, *members)
  node_for(key).srem?(key, *members)
end

#sscan(key, cursor, **options) ⇒ Object

Scan a set



603
604
605
# File 'lib/redis/distributed.rb', line 603

def sscan(key, cursor, **options)
  node_for(key).sscan(key, cursor, **options)
end

#sscan_each(key, **options, &block) ⇒ Object

Scan a set and return an enumerator



608
609
610
# File 'lib/redis/distributed.rb', line 608

def sscan_each(key, **options, &block)
  node_for(key).sscan_each(key, **options, &block)
end

#strlen(key) ⇒ Object

Get the length of the value stored in a key.



396
397
398
# File 'lib/redis/distributed.rb', line 396

def strlen(key)
  node_for(key).strlen(key)
end

#subscribe(channel, *channels, &block) ⇒ Object

Listen for messages published to the given channels.



893
894
895
896
897
898
899
900
901
902
903
# File 'lib/redis/distributed.rb', line 893

def subscribe(channel, *channels, &block)
  if channels.empty?
    @subscribed_node = node_for(channel)
    @subscribed_node.subscribe(channel, &block)
  else
    ensure_same_node(:subscribe, [channel] + channels) do |node|
      @subscribed_node = node
      node.subscribe(channel, *channels, &block)
    end
  end
end

#subscribed?Boolean

Returns:

  • (Boolean)


888
889
890
# File 'lib/redis/distributed.rb', line 888

def subscribed?
  !!@subscribed_node
end

#sunion(*keys) ⇒ Object

Add multiple sets.



645
646
647
648
649
650
# File 'lib/redis/distributed.rb', line 645

def sunion(*keys)
  keys.flatten!(1)
  ensure_same_node(:sunion, keys) do |node|
    node.sunion(keys)
  end
end

#sunionstore(destination, *keys) ⇒ Object

Add multiple sets and store the resulting set in a key.



653
654
655
656
657
658
# File 'lib/redis/distributed.rb', line 653

def sunionstore(destination, *keys)
  keys.flatten!(1)
  ensure_same_node(:sunionstore, [destination].concat(keys)) do |node|
    node.sunionstore(destination, keys)
  end
end

#timeObject

Get server time: an UNIX timestamp and the elapsed microseconds in the current second.



114
115
116
# File 'lib/redis/distributed.rb', line 114

def time
  on_each_node :time
end

#ttl(key) ⇒ Object

Get the time to live (in seconds) for a key.



134
135
136
# File 'lib/redis/distributed.rb', line 134

def ttl(key)
  node_for(key).ttl(key)
end

#type(key) ⇒ Object

Determine the type stored at key.



251
252
253
# File 'lib/redis/distributed.rb', line 251

def type(key)
  node_for(key).type(key)
end

Unlink keys.



178
179
180
181
182
183
184
# File 'lib/redis/distributed.rb', line 178

def unlink(*args)
  args.flatten!(1)
  keys_per_node = args.group_by { |key| node_for(key) }
  keys_per_node.inject(0) do |sum, (node, keys)|
    sum + node.unlink(*keys)
  end
end

#unsubscribe(*channels) ⇒ Object

Stop listening for messages posted to the given channels.

Raises:



906
907
908
909
910
# File 'lib/redis/distributed.rb', line 906

def unsubscribe(*channels)
  raise SubscriptionError, "Can't unsubscribe if not subscribed." unless subscribed?

  @subscribed_node.unsubscribe(*channels)
end

#unwatchObject

Forget about all watched keys.

Raises:



938
939
940
941
942
943
944
# File 'lib/redis/distributed.rb', line 938

def unwatch
  raise CannotDistribute, :unwatch unless @watch_key

  result = node_for(@watch_key).unwatch
  @watch_key = nil
  result
end

#watch(*keys, &block) ⇒ Object

Watch the given keys to determine execution of the MULTI/EXEC block.



924
925
926
927
928
929
930
931
932
933
934
935
# File 'lib/redis/distributed.rb', line 924

def watch(*keys, &block)
  ensure_same_node(:watch, keys) do |node|
    @watch_key = key_tag(keys.first) || keys.first.to_s

    begin
      node.watch(*keys, &block)
    rescue StandardError
      @watch_key = nil
      raise
    end
  end
end

#zadd(key, *args) ⇒ Object

Add one or more members to a sorted set, or update the score for members that already exist.



667
668
669
# File 'lib/redis/distributed.rb', line 667

def zadd(key, *args)
  node_for(key).zadd(key, *args)
end

#zcard(key) ⇒ Object

Get the number of members in a sorted set.



661
662
663
# File 'lib/redis/distributed.rb', line 661

def zcard(key)
  node_for(key).zcard(key)
end

#zcount(key, min, max) ⇒ Object

Get the number of members in a particular score range.



749
750
751
# File 'lib/redis/distributed.rb', line 749

def zcount(key, min, max)
  node_for(key).zcount(key, min, max)
end

#zdiff(*keys, **options) ⇒ Object

Return the difference between the first and all successive input sorted sets.



787
788
789
790
791
792
# File 'lib/redis/distributed.rb', line 787

def zdiff(*keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zdiff, keys) do |node|
    node.zdiff(keys, **options)
  end
end

#zdiffstore(destination, *keys, **options) ⇒ Object

Compute the difference between the first and all successive input sorted sets and store the resulting sorted set in a new key.



796
797
798
799
800
801
# File 'lib/redis/distributed.rb', line 796

def zdiffstore(destination, *keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zdiffstore, [destination] + keys) do |node|
    node.zdiffstore(destination, keys, **options)
  end
end

#zincrby(key, increment, member) ⇒ Object

Increment the score of a member in a sorted set.



673
674
675
# File 'lib/redis/distributed.rb', line 673

def zincrby(key, increment, member)
  node_for(key).zincrby(key, increment, member)
end

#zinter(*keys, **options) ⇒ Object

Get the intersection of multiple sorted sets



754
755
756
757
758
759
# File 'lib/redis/distributed.rb', line 754

def zinter(*keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zinter, keys) do |node|
    node.zinter(keys, **options)
  end
end

#zinterstore(destination, *keys, **options) ⇒ Object

Intersect multiple sorted sets and store the resulting sorted set in a new key.



763
764
765
766
767
768
# File 'lib/redis/distributed.rb', line 763

def zinterstore(destination, *keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zinterstore, [destination].concat(keys)) do |node|
    node.zinterstore(destination, keys, **options)
  end
end

#zmscore(key, *members) ⇒ Object

Get the scores associated with the given members in a sorted set.



693
694
695
# File 'lib/redis/distributed.rb', line 693

def zmscore(key, *members)
  node_for(key).zmscore(key, *members)
end

#zrandmember(key, count = nil, **options) ⇒ Object

Get one or more random members from a sorted set.



688
689
690
# File 'lib/redis/distributed.rb', line 688

def zrandmember(key, count = nil, **options)
  node_for(key).zrandmember(key, count, **options)
end

#zrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index, score or lexicographical ordering.



698
699
700
# File 'lib/redis/distributed.rb', line 698

def zrange(key, start, stop, **options)
  node_for(key).zrange(key, start, stop, **options)
end

#zrangebyscore(key, min, max, **options) ⇒ Object

Return a range of members in a sorted set, by score.



733
734
735
# File 'lib/redis/distributed.rb', line 733

def zrangebyscore(key, min, max, **options)
  node_for(key).zrangebyscore(key, min, max, **options)
end

#zrangestore(dest_key, src_key, start, stop, **options) ⇒ Object

Select a range of members in a sorted set, by index, score or lexicographical ordering and store the resulting sorted set in a new key.



704
705
706
707
708
# File 'lib/redis/distributed.rb', line 704

def zrangestore(dest_key, src_key, start, stop, **options)
  ensure_same_node(:zrangestore, [dest_key, src_key]) do |node|
    node.zrangestore(dest_key, src_key, start, stop, **options)
  end
end

#zrank(key, member) ⇒ Object

Determine the index of a member in a sorted set.



717
718
719
# File 'lib/redis/distributed.rb', line 717

def zrank(key, member)
  node_for(key).zrank(key, member)
end

#zrem(key, member) ⇒ Object

Remove one or more members from a sorted set.



678
679
680
# File 'lib/redis/distributed.rb', line 678

def zrem(key, member)
  node_for(key).zrem(key, member)
end

#zremrangebyrank(key, start, stop) ⇒ Object

Remove all members in a sorted set within the given indexes.



728
729
730
# File 'lib/redis/distributed.rb', line 728

def zremrangebyrank(key, start, stop)
  node_for(key).zremrangebyrank(key, start, stop)
end

#zremrangebyscore(key, min, max) ⇒ Object

Remove all members in a sorted set within the given scores.



744
745
746
# File 'lib/redis/distributed.rb', line 744

def zremrangebyscore(key, min, max)
  node_for(key).zremrangebyscore(key, min, max)
end

#zrevrange(key, start, stop, **options) ⇒ Object

Return a range of members in a sorted set, by index, with scores ordered from high to low.



712
713
714
# File 'lib/redis/distributed.rb', line 712

def zrevrange(key, start, stop, **options)
  node_for(key).zrevrange(key, start, stop, **options)
end

#zrevrangebyscore(key, max, min, **options) ⇒ Object

Return a range of members in a sorted set, by score, with scores ordered from high to low.



739
740
741
# File 'lib/redis/distributed.rb', line 739

def zrevrangebyscore(key, max, min, **options)
  node_for(key).zrevrangebyscore(key, max, min, **options)
end

#zrevrank(key, member) ⇒ Object

Determine the index of a member in a sorted set, with scores ordered from high to low.



723
724
725
# File 'lib/redis/distributed.rb', line 723

def zrevrank(key, member)
  node_for(key).zrevrank(key, member)
end

#zscore(key, member) ⇒ Object

Get the score associated with the given member in a sorted set.



683
684
685
# File 'lib/redis/distributed.rb', line 683

def zscore(key, member)
  node_for(key).zscore(key, member)
end

#zunion(*keys, **options) ⇒ Object

Return the union of multiple sorted sets.



771
772
773
774
775
776
# File 'lib/redis/distributed.rb', line 771

def zunion(*keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zunion, keys) do |node|
    node.zunion(keys, **options)
  end
end

#zunionstore(destination, *keys, **options) ⇒ Object

Add multiple sorted sets and store the resulting sorted set in a new key.



779
780
781
782
783
784
# File 'lib/redis/distributed.rb', line 779

def zunionstore(destination, *keys, **options)
  keys.flatten!(1)
  ensure_same_node(:zunionstore, [destination].concat(keys)) do |node|
    node.zunionstore(destination, keys, **options)
  end
end