Class: RedisClient::Cluster::Node

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/redis_client/cluster/node.rb,
lib/redis_client/cluster/node/primary_only.rb,
lib/redis_client/cluster/node/base_topology.rb,
lib/redis_client/cluster/node/random_replica.rb,
lib/redis_client/cluster/node/latency_replica.rb,
lib/redis_client/cluster/node/random_replica_or_primary.rb

Defined Under Namespace

Classes: BaseTopology, CharArray, Config, Info, LatencyReplica, PrimaryOnly, RandomReplica, RandomReplicaOrPrimary

Constant Summary collapse

MAX_STARTUP_SAMPLE =

It affects to strike a balance between load and stability in initialization or changed states.

Integer(ENV.fetch('REDIS_CLIENT_MAX_STARTUP_SAMPLE', 3))
USE_CHAR_ARRAY_SLOT =

less memory consumption, but slow

Integer(ENV.fetch('REDIS_CLIENT_USE_CHAR_ARRAY_SLOT', 1)) == 1
SLOT_SIZE =
16_384
MIN_SLOT =
0
MAX_SLOT =
SLOT_SIZE - 1
DEAD_FLAGS =
%w[fail? fail handshake noaddr noflags].freeze
ROLE_FLAGS =
%w[master slave].freeze
EMPTY_ARRAY =
[].freeze
EMPTY_HASH =
{}.freeze
ReloadNeeded =
Class.new(::RedisClient::Error)

Instance Method Summary collapse

Constructor Details

#initialize(concurrent_worker, config:, pool: nil, **kwargs) ⇒ Node

Returns a new instance of Node.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/redis_client/cluster/node.rb', line 98

def initialize(
  concurrent_worker,
  config:,
  pool: nil,
  **kwargs
)

  @concurrent_worker = concurrent_worker
  @slots = build_slot_node_mappings(EMPTY_ARRAY)
  @replications = build_replication_mappings(EMPTY_ARRAY)
  klass = make_topology_class(config.use_replica?, config.replica_affinity)
  @topology = klass.new(pool, @concurrent_worker, **kwargs)
  @config = config
  @mutex = Mutex.new
end

Instance Method Details

#any_primary_node_key(seed: nil) ⇒ Object



187
188
189
# File 'lib/redis_client/cluster/node.rb', line 187

def any_primary_node_key(seed: nil)
  @topology.any_primary_node_key(seed: seed)
end

#any_replica_node_key(seed: nil) ⇒ Object



191
192
193
# File 'lib/redis_client/cluster/node.rb', line 191

def any_replica_node_key(seed: nil)
  @topology.any_replica_node_key(seed: seed)
end

#call_all(method, command, args, &block) ⇒ Object



136
137
138
# File 'lib/redis_client/cluster/node.rb', line 136

def call_all(method, command, args, &block)
  call_multiple_nodes!(@topology.clients, method, command, args, &block)
end

#call_primaries(method, command, args, &block) ⇒ Object



140
141
142
# File 'lib/redis_client/cluster/node.rb', line 140

def call_primaries(method, command, args, &block)
  call_multiple_nodes!(@topology.primary_clients, method, command, args, &block)
end

#call_replicas(method, command, args, &block) ⇒ Object



144
145
146
# File 'lib/redis_client/cluster/node.rb', line 144

def call_replicas(method, command, args, &block)
  call_multiple_nodes!(@topology.replica_clients, method, command, args, &block)
end

#clientsObject



161
162
163
# File 'lib/redis_client/cluster/node.rb', line 161

def clients
  @topology.clients.values
end

#clients_for_scanning(seed: nil) ⇒ Object



157
158
159
# File 'lib/redis_client/cluster/node.rb', line 157

def clients_for_scanning(seed: nil)
  @topology.clients_for_scanning(seed: seed).values.sort_by { |c| "#{c.config.host}-#{c.config.port}" }
end

#each(&block) ⇒ Object



118
119
120
# File 'lib/redis_client/cluster/node.rb', line 118

def each(&block)
  @topology.clients.each_value(&block)
end

#find_by(node_key) ⇒ Object

Raises:



130
131
132
133
134
# File 'lib/redis_client/cluster/node.rb', line 130

def find_by(node_key)
  raise ReloadNeeded if node_key.nil? || !@topology.clients.key?(node_key)

  @topology.clients.fetch(node_key)
end

#find_node_key_of_primary(slot) ⇒ Object



173
174
175
176
177
178
179
180
# File 'lib/redis_client/cluster/node.rb', line 173

def find_node_key_of_primary(slot)
  return if slot.nil?

  slot = Integer(slot)
  return if slot < MIN_SLOT || slot > MAX_SLOT

  @slots[slot]
end

#find_node_key_of_replica(slot, seed: nil) ⇒ Object



182
183
184
185
# File 'lib/redis_client/cluster/node.rb', line 182

def find_node_key_of_replica(slot, seed: nil)
  primary_node_key = find_node_key_of_primary(slot)
  @topology.find_node_key_of_replica(primary_node_key, seed: seed)
end

#inspectObject



114
115
116
# File 'lib/redis_client/cluster/node.rb', line 114

def inspect
  "#<#{self.class.name} #{node_keys.join(', ')}>"
end

#node_keysObject



126
127
128
# File 'lib/redis_client/cluster/node.rb', line 126

def node_keys
  @topology.clients.keys.sort
end

#owns_error?(err) ⇒ Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/redis_client/cluster/node.rb', line 220

def owns_error?(err)
  any? { |c| ErrorIdentification.client_owns_error?(err, c) }
end

#primary_clientsObject



165
166
167
# File 'lib/redis_client/cluster/node.rb', line 165

def primary_clients
  @topology.primary_clients.values
end

#reload!Object



206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/redis_client/cluster/node.rb', line 206

def reload!
  with_reload_lock do
    with_startup_clients(MAX_STARTUP_SAMPLE) do |startup_clients|
      @node_info = refetch_node_info_list(startup_clients)
      @node_configs = @node_info.to_h do |node_info|
        [node_info.node_key, @config.client_config_for_node(node_info.node_key)]
      end
      @slots = build_slot_node_mappings(@node_info)
      @replications = build_replication_mappings(@node_info)
      @topology.process_topology_update!(@replications, @node_configs)
    end
  end
end

#replica_clientsObject



169
170
171
# File 'lib/redis_client/cluster/node.rb', line 169

def replica_clients
  @topology.replica_clients.values
end

#sampleObject



122
123
124
# File 'lib/redis_client/cluster/node.rb', line 122

def sample
  @topology.clients.values.sample
end

#send_ping(method, command, args, &block) ⇒ Object

Raises:



148
149
150
151
152
153
154
155
# File 'lib/redis_client/cluster/node.rb', line 148

def send_ping(method, command, args, &block)
  result_values, errors = call_multiple_nodes(@topology.clients, method, command, args, &block)
  return result_values if errors.nil? || errors.empty?

  raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError)

  raise ::RedisClient::Cluster::ErrorCollection, errors
end

#update_slot(slot, node_key) ⇒ Object



195
196
197
198
199
200
201
202
203
204
# File 'lib/redis_client/cluster/node.rb', line 195

def update_slot(slot, node_key)
  return if @mutex.locked?

  @mutex.synchronize do
    @slots[slot] = node_key
  rescue RangeError
    @slots = Array.new(SLOT_SIZE) { |i| @slots[i] }
    @slots[slot] = node_key
  end
end