Class: RedisClient::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_client/cluster.rb,
lib/redis_client/cluster/node.rb,
lib/redis_client/cluster/errors.rb,
lib/redis_client/cluster/router.rb,
lib/redis_client/cluster/command.rb,
lib/redis_client/cluster/pub_sub.rb,
lib/redis_client/cluster/node_key.rb,
lib/redis_client/cluster/pipeline.rb,
lib/redis_client/cluster/transaction.rb,
lib/redis_client/cluster/concurrent_worker.rb,
lib/redis_client/cluster/node/primary_only.rb,
lib/redis_client/cluster/key_slot_converter.rb,
lib/redis_client/cluster/node/base_topology.rb,
lib/redis_client/cluster/optimistic_locking.rb,
lib/redis_client/cluster/node/random_replica.rb,
lib/redis_client/cluster/error_identification.rb,
lib/redis_client/cluster/node/latency_replica.rb,
lib/redis_client/cluster/noop_command_builder.rb,
lib/redis_client/cluster/concurrent_worker/none.rb,
lib/redis_client/cluster/concurrent_worker/pooled.rb,
lib/redis_client/cluster/concurrent_worker/on_demand.rb,
lib/redis_client/cluster/node/random_replica_or_primary.rb

Defined Under Namespace

Modules: ConcurrentWorker, ErrorIdentification, KeySlotConverter, NodeKey, NoopCommandBuilder Classes: AmbiguousNodeError, Command, Error, ErrorCollection, InitialSetupError, Node, NodeMightBeDown, OptimisticLocking, OrchestrationCommandNotSupported, Pipeline, PubSub, Router, Transaction

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = nil, pool: nil, concurrency: nil, **kwargs) ⇒ Cluster

Returns a new instance of Cluster.



18
19
20
21
22
23
24
25
26
27
# File 'lib/redis_client/cluster.rb', line 18

def initialize(config = nil, pool: nil, concurrency: nil, **kwargs)
  @config = config.nil? ? ClusterConfig.new(**kwargs) : config
  @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
  @command_builder = @config.command_builder

  @pool = pool
  @kwargs = kwargs
  @router = nil
  @mutex = Mutex.new
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args, **kwargs, &block) ⇒ Object (private)



154
155
156
157
158
159
160
161
162
163
# File 'lib/redis_client/cluster.rb', line 154

def method_missing(name, *args, **kwargs, &block)
  cmd = name.respond_to?(:name) ? name.name : name.to_s
  if router.command_exists?(cmd)
    args.unshift(cmd)
    command = @command_builder.generate(args, kwargs)
    return router.send_command(:call_v, command, &block)
  end

  super
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



16
17
18
# File 'lib/redis_client/cluster.rb', line 16

def config
  @config
end

Instance Method Details

#blocking_call(timeout, *args, **kwargs, &block) ⇒ Object



54
55
56
57
# File 'lib/redis_client/cluster.rb', line 54

def blocking_call(timeout, *args, **kwargs, &block)
  command = @command_builder.generate(args, kwargs)
  router.send_command(:blocking_call_v, command, timeout, &block)
end

#blocking_call_v(timeout, command, &block) ⇒ Object



59
60
61
62
# File 'lib/redis_client/cluster.rb', line 59

def blocking_call_v(timeout, command, &block)
  command = @command_builder.generate(command)
  router.send_command(:blocking_call_v, command, timeout, &block)
end

#call(*args, **kwargs, &block) ⇒ Object



34
35
36
37
# File 'lib/redis_client/cluster.rb', line 34

def call(*args, **kwargs, &block)
  command = @command_builder.generate(args, kwargs)
  router.send_command(:call_v, command, &block)
end

#call_once(*args, **kwargs, &block) ⇒ Object



44
45
46
47
# File 'lib/redis_client/cluster.rb', line 44

def call_once(*args, **kwargs, &block)
  command = @command_builder.generate(args, kwargs)
  router.send_command(:call_once_v, command, &block)
end

#call_once_v(command, &block) ⇒ Object



49
50
51
52
# File 'lib/redis_client/cluster.rb', line 49

def call_once_v(command, &block)
  command = @command_builder.generate(command)
  router.send_command(:call_once_v, command, &block)
end

#call_v(command, &block) ⇒ Object



39
40
41
42
# File 'lib/redis_client/cluster.rb', line 39

def call_v(command, &block)
  command = @command_builder.generate(command)
  router.send_command(:call_v, command, &block)
end

#closeObject



138
139
140
141
142
# File 'lib/redis_client/cluster.rb', line 138

def close
  @router&.close
  @concurrent_worker.close
  nil
end

#hscan(key, *args, **kwargs, &block) ⇒ Object



84
85
86
87
88
89
# File 'lib/redis_client/cluster.rb', line 84

def hscan(key, *args, **kwargs, &block)
  return to_enum(__callee__, key, *args, **kwargs) unless block_given?

  command = @command_builder.generate(['hscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
  router.scan_single_key(command, arity: 2, &block)
end

#inspectObject



29
30
31
32
# File 'lib/redis_client/cluster.rb', line 29

def inspect
  node_keys = @router.nil? ? @config.startup_nodes.keys : router.node_keys
  "#<#{self.class.name} #{node_keys.join(', ')}>"
end

#multi(watch: nil) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/redis_client/cluster.rb', line 114

def multi(watch: nil)
  if watch.nil? || watch.empty?
    transaction = ::RedisClient::Cluster::Transaction.new(router, @command_builder)
    yield transaction
    return transaction.execute
  end

  ::RedisClient::Cluster::OptimisticLocking.new(router).watch(watch) do |c, slot, asking|
    transaction = ::RedisClient::Cluster::Transaction.new(
      router, @command_builder, node: c, slot: slot, asking: asking
    )
    yield transaction
    transaction.execute
  end
end

#pipelined(exception: true) {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/redis_client/cluster.rb', line 98

def pipelined(exception: true)
  seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
  pipeline = ::RedisClient::Cluster::Pipeline.new(
    router,
    @command_builder,
    @concurrent_worker,
    exception: exception,
    seed: seed
  )

  yield pipeline
  return [] if pipeline.empty?

  pipeline.execute
end

#pubsubObject



130
131
132
# File 'lib/redis_client/cluster.rb', line 130

def pubsub
  ::RedisClient::Cluster::PubSub.new(router, @command_builder)
end

#scan(*args, **kwargs, &block) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/redis_client/cluster.rb', line 64

def scan(*args, **kwargs, &block)
  return to_enum(__callee__, *args, **kwargs) unless block_given?

  command = @command_builder.generate(['scan', ZERO_CURSOR_FOR_SCAN] + args, kwargs)
  seed = Random.new_seed
  loop do
    cursor, keys = router.scan(command, seed: seed)
    command[1] = cursor
    keys.each(&block)
    break if cursor == ZERO_CURSOR_FOR_SCAN
  end
end

#sscan(key, *args, **kwargs, &block) ⇒ Object



77
78
79
80
81
82
# File 'lib/redis_client/cluster.rb', line 77

def sscan(key, *args, **kwargs, &block)
  return to_enum(__callee__, key, *args, **kwargs) unless block_given?

  command = @command_builder.generate(['sscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
  router.scan_single_key(command, arity: 1, &block)
end

#withObject

Raises:

  • (NotImplementedError)


134
135
136
# File 'lib/redis_client/cluster.rb', line 134

def with(...)
  raise NotImplementedError, 'No way to use'
end

#zscan(key, *args, **kwargs, &block) ⇒ Object



91
92
93
94
95
96
# File 'lib/redis_client/cluster.rb', line 91

def zscan(key, *args, **kwargs, &block)
  return to_enum(__callee__, key, *args, **kwargs) unless block_given?

  command = @command_builder.generate(['zscan', key, ZERO_CURSOR_FOR_SCAN] + args, kwargs)
  router.scan_single_key(command, arity: 2, &block)
end