Class: RedisClient::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_client/cluster.rb,
lib/redis_client/cluster/node.rb,
lib/redis_client/cluster/errors.rb,
lib/redis_client/cluster/router.rb,
lib/redis_client/cluster/command.rb,
lib/redis_client/cluster/pub_sub.rb,
lib/redis_client/cluster/node_key.rb,
lib/redis_client/cluster/pipeline.rb,
lib/redis_client/cluster/transaction.rb,
lib/redis_client/cluster/pinning_node.rb,
lib/redis_client/cluster/concurrent_worker.rb,
lib/redis_client/cluster/node/primary_only.rb,
lib/redis_client/cluster/key_slot_converter.rb,
lib/redis_client/cluster/node/base_topology.rb,
lib/redis_client/cluster/optimistic_locking.rb,
lib/redis_client/cluster/node/random_replica.rb,
lib/redis_client/cluster/normalized_cmd_name.rb,
lib/redis_client/cluster/error_identification.rb,
lib/redis_client/cluster/node/latency_replica.rb,
lib/redis_client/cluster/concurrent_worker/none.rb,
lib/redis_client/cluster/concurrent_worker/pooled.rb,
lib/redis_client/cluster/concurrent_worker/on_demand.rb,
lib/redis_client/cluster/node/random_replica_or_primary.rb

Defined Under Namespace

Modules: ConcurrentWorker, ErrorIdentification, KeySlotConverter, NodeKey Classes: AmbiguousNodeError, Command, ErrorCollection, InitialSetupError, Node, NodeMightBeDown, NormalizedCmdName, OptimisticLocking, OrchestrationCommandNotSupported, PinningNode, Pipeline, PubSub, Router, Transaction

Constant Summary collapse

ZERO_CURSOR_FOR_SCAN =
'0'
ERR_ARG_NORMALIZATION =
->(arg) { Array[arg].flatten.reject { |e| e.nil? || (e.respond_to?(:empty?) && e.empty?) } }

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, pool: nil, concurrency: nil, **kwargs) ⇒ Cluster

Returns a new instance of Cluster.



17
18
19
20
21
22
# File 'lib/redis_client/cluster.rb', line 17

def initialize(config, pool: nil, concurrency: nil, **kwargs)
  @config = config
  @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
  @router = ::RedisClient::Cluster::Router.new(config, @concurrent_worker, pool: pool, **kwargs)
  @command_builder = config.command_builder
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args, **kwargs, &block) ⇒ Object (private)



144
145
146
147
148
149
150
151
152
# File 'lib/redis_client/cluster.rb', line 144

def method_missing(name, *args, **kwargs, &block)
  if @router.command_exists?(name)
    args.unshift(name)
    command = @command_builder.generate(args, kwargs)
    return @router.send_command(:call_v, command, &block)
  end

  super
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



15
16
17
# File 'lib/redis_client/cluster.rb', line 15

def config
  @config
end

Instance Method Details

#blocking_call(timeout, *args, **kwargs, &block) ⇒ Object



48
49
50
51
# File 'lib/redis_client/cluster.rb', line 48

def blocking_call(timeout, *args, **kwargs, &block)
  command = @command_builder.generate(args, kwargs)
  @router.send_command(:blocking_call_v, command, timeout, &block)
end

#blocking_call_v(timeout, command, &block) ⇒ Object



53
54
55
56
# File 'lib/redis_client/cluster.rb', line 53

def blocking_call_v(timeout, command, &block)
  command = @command_builder.generate(command)
  @router.send_command(:blocking_call_v, command, timeout, &block)
end

#call(*args, **kwargs, &block) ⇒ Object



28
29
30
31
# File 'lib/redis_client/cluster.rb', line 28

def call(*args, **kwargs, &block)
  command = @command_builder.generate(args, kwargs)
  @router.send_command(:call_v, command, &block)
end

#call_once(*args, **kwargs, &block) ⇒ Object



38
39
40
41
# File 'lib/redis_client/cluster.rb', line 38

def call_once(*args, **kwargs, &block)
  command = @command_builder.generate(args, kwargs)
  @router.send_command(:call_once_v, command, &block)
end

#call_once_v(command, &block) ⇒ Object



43
44
45
46
# File 'lib/redis_client/cluster.rb', line 43

def call_once_v(command, &block)
  command = @command_builder.generate(command)
  @router.send_command(:call_once_v, command, &block)
end

#call_v(command, &block) ⇒ Object



33
34
35
36
# File 'lib/redis_client/cluster.rb', line 33

def call_v(command, &block)
  command = @command_builder.generate(command)
  @router.send_command(:call_v, command, &block)
end

#closeObject



123
124
125
126
127
# File 'lib/redis_client/cluster.rb', line 123

def close
  @concurrent_worker.close
  @router.close
  nil
end

#hscan(key, *args, **kwargs, &block) ⇒ Object



75
76
77
78
# File 'lib/redis_client/cluster.rb', line 75

def hscan(key, *args, **kwargs, &block)
  node = @router.assign_node(['HSCAN', key])
  @router.try_delegate(node, :hscan, key, *args, **kwargs, &block)
end

#inspectObject



24
25
26
# File 'lib/redis_client/cluster.rb', line 24

def inspect
  "#<#{self.class.name} #{@router.node_keys.join(', ')}>"
end

#multi(watch: nil) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/redis_client/cluster.rb', line 94

def multi(watch: nil)
  if watch.nil? || watch.empty?
    transaction = ::RedisClient::Cluster::Transaction.new(@router, @command_builder)
    yield transaction
    return transaction.execute
  end

  ::RedisClient::Cluster::OptimisticLocking.new(@router).watch(watch) do |c, slot|
    transaction = ::RedisClient::Cluster::Transaction.new(
      @router, @command_builder, node: c, slot: slot
    )
    yield transaction
    transaction.execute
  end
end

#pipelined {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


85
86
87
88
89
90
91
92
# File 'lib/redis_client/cluster.rb', line 85

def pipelined
  seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
  pipeline = ::RedisClient::Cluster::Pipeline.new(@router, @command_builder, @concurrent_worker, seed: seed)
  yield pipeline
  return [] if pipeline.empty?

  pipeline.execute
end

#pubsubObject



110
111
112
# File 'lib/redis_client/cluster.rb', line 110

def pubsub
  ::RedisClient::Cluster::PubSub.new(@router, @command_builder)
end

#scan(*args, **kwargs, &block) ⇒ Object

Raises:

  • (ArgumentError)


58
59
60
61
62
63
64
65
66
67
68
# File 'lib/redis_client/cluster.rb', line 58

def scan(*args, **kwargs, &block)
  raise ArgumentError, 'block required' unless block

  seed = Random.new_seed
  cursor = ZERO_CURSOR_FOR_SCAN
  loop do
    cursor, keys = @router.scan('SCAN', cursor, *args, seed: seed, **kwargs)
    keys.each(&block)
    break if cursor == ZERO_CURSOR_FOR_SCAN
  end
end

#sscan(key, *args, **kwargs, &block) ⇒ Object



70
71
72
73
# File 'lib/redis_client/cluster.rb', line 70

def sscan(key, *args, **kwargs, &block)
  node = @router.assign_node(['SSCAN', key])
  @router.try_delegate(node, :sscan, key, *args, **kwargs, &block)
end

#with(key: nil, hashtag: nil, write: true) ⇒ Object

TODO: This isn’t an official public interface yet. Don’t use in your production environment.



116
117
118
119
120
121
# File 'lib/redis_client/cluster.rb', line 116

def with(key: nil, hashtag: nil, write: true)
  key = process_with_arguments(key, hashtag)
  node_key = @router.find_node_key_by_key(key, primary: write)
  node = @router.find_node(node_key)
  node.with { |c| yield ::RedisClient::Cluster::PinningNode.new(c) }
end

#zscan(key, *args, **kwargs, &block) ⇒ Object



80
81
82
83
# File 'lib/redis_client/cluster.rb', line 80

def zscan(key, *args, **kwargs, &block)
  node = @router.assign_node(['ZSCAN', key])
  @router.try_delegate(node, :zscan, key, *args, **kwargs, &block)
end