Class: RedisClient::Cluster::Node
- Inherits:
-
Object
- Object
- RedisClient::Cluster::Node
show all
- Includes:
- Enumerable
- Defined in:
- lib/redis_client/cluster/node.rb,
lib/redis_client/cluster/node/primary_only.rb,
lib/redis_client/cluster/node/base_topology.rb,
lib/redis_client/cluster/node/random_replica.rb,
lib/redis_client/cluster/node/latency_replica.rb,
lib/redis_client/cluster/node/random_replica_or_primary.rb
Defined Under Namespace
Classes: BaseTopology, CharArray, Config, Info, LatencyReplica, PrimaryOnly, RandomReplica, RandomReplicaOrPrimary
Constant Summary
collapse
- MAX_STARTUP_SAMPLE =
It affects to strike a balance between load and stability in initialization or changed states.
Integer(ENV.fetch('REDIS_CLIENT_MAX_STARTUP_SAMPLE', 3))
- USE_CHAR_ARRAY_SLOT =
less memory consumption, but slow
Integer(ENV.fetch('REDIS_CLIENT_USE_CHAR_ARRAY_SLOT', 1)) == 1
- SLOT_SIZE =
16_384
- MIN_SLOT =
0
- MAX_SLOT =
SLOT_SIZE - 1
- DEAD_FLAGS =
%w[fail? fail handshake noaddr noflags].freeze
- ROLE_FLAGS =
%w[master slave].freeze
- EMPTY_ARRAY =
[].freeze
- EMPTY_HASH =
{}.freeze
- ReloadNeeded =
Class.new(::RedisClient::Error)
Instance Method Summary
collapse
-
#any_primary_node_key(seed: nil) ⇒ Object
-
#any_replica_node_key(seed: nil) ⇒ Object
-
#call_all(method, command, args, &block) ⇒ Object
-
#call_primaries(method, command, args, &block) ⇒ Object
-
#call_replicas(method, command, args, &block) ⇒ Object
-
#clients ⇒ Object
-
#clients_for_scanning(seed: nil) ⇒ Object
-
#each(&block) ⇒ Object
-
#find_by(node_key) ⇒ Object
-
#find_node_key_of_primary(slot) ⇒ Object
-
#find_node_key_of_replica(slot, seed: nil) ⇒ Object
-
#initialize(concurrent_worker, config:, pool: nil, **kwargs) ⇒ Node
constructor
-
#inspect ⇒ Object
-
#node_keys ⇒ Object
-
#owns_error?(err) ⇒ Boolean
-
#primary_clients ⇒ Object
-
#reload! ⇒ Object
-
#replica_clients ⇒ Object
-
#sample ⇒ Object
-
#send_ping(method, command, args, &block) ⇒ Object
-
#update_slot(slot, node_key) ⇒ Object
Constructor Details
#initialize(concurrent_worker, config:, pool: nil, **kwargs) ⇒ Node
Returns a new instance of Node.
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/redis_client/cluster/node.rb', line 98
def initialize(
concurrent_worker,
config:,
pool: nil,
**kwargs
)
@concurrent_worker = concurrent_worker
@slots = build_slot_node_mappings(EMPTY_ARRAY)
@replications = build_replication_mappings(EMPTY_ARRAY)
klass = make_topology_class(config.use_replica?, config.replica_affinity)
@topology = klass.new(pool, @concurrent_worker, **kwargs)
@config = config
@mutex = Mutex.new
end
|
Instance Method Details
#any_primary_node_key(seed: nil) ⇒ Object
187
188
189
|
# File 'lib/redis_client/cluster/node.rb', line 187
def any_primary_node_key(seed: nil)
@topology.any_primary_node_key(seed: seed)
end
|
#any_replica_node_key(seed: nil) ⇒ Object
191
192
193
|
# File 'lib/redis_client/cluster/node.rb', line 191
def any_replica_node_key(seed: nil)
@topology.any_replica_node_key(seed: seed)
end
|
#call_all(method, command, args, &block) ⇒ Object
136
137
138
|
# File 'lib/redis_client/cluster/node.rb', line 136
def call_all(method, command, args, &block)
call_multiple_nodes!(@topology.clients, method, command, args, &block)
end
|
#call_primaries(method, command, args, &block) ⇒ Object
140
141
142
|
# File 'lib/redis_client/cluster/node.rb', line 140
def call_primaries(method, command, args, &block)
call_multiple_nodes!(@topology.primary_clients, method, command, args, &block)
end
|
#call_replicas(method, command, args, &block) ⇒ Object
144
145
146
|
# File 'lib/redis_client/cluster/node.rb', line 144
def call_replicas(method, command, args, &block)
call_multiple_nodes!(@topology.replica_clients, method, command, args, &block)
end
|
#clients ⇒ Object
161
162
163
|
# File 'lib/redis_client/cluster/node.rb', line 161
def clients
@topology.clients.values
end
|
#clients_for_scanning(seed: nil) ⇒ Object
157
158
159
|
# File 'lib/redis_client/cluster/node.rb', line 157
def clients_for_scanning(seed: nil)
@topology.clients_for_scanning(seed: seed).values.sort_by { |c| "#{c.config.host}-#{c.config.port}" }
end
|
#each(&block) ⇒ Object
118
119
120
|
# File 'lib/redis_client/cluster/node.rb', line 118
def each(&block)
@topology.clients.each_value(&block)
end
|
#find_by(node_key) ⇒ Object
130
131
132
133
134
|
# File 'lib/redis_client/cluster/node.rb', line 130
def find_by(node_key)
raise ReloadNeeded if node_key.nil? || !@topology.clients.key?(node_key)
@topology.clients.fetch(node_key)
end
|
#find_node_key_of_primary(slot) ⇒ Object
173
174
175
176
177
178
179
180
|
# File 'lib/redis_client/cluster/node.rb', line 173
def find_node_key_of_primary(slot)
return if slot.nil?
slot = Integer(slot)
return if slot < MIN_SLOT || slot > MAX_SLOT
@slots[slot]
end
|
#find_node_key_of_replica(slot, seed: nil) ⇒ Object
182
183
184
185
|
# File 'lib/redis_client/cluster/node.rb', line 182
def find_node_key_of_replica(slot, seed: nil)
primary_node_key = find_node_key_of_primary(slot)
@topology.find_node_key_of_replica(primary_node_key, seed: seed)
end
|
#inspect ⇒ Object
114
115
116
|
# File 'lib/redis_client/cluster/node.rb', line 114
def inspect
"#<#{self.class.name} #{node_keys.join(', ')}>"
end
|
#node_keys ⇒ Object
126
127
128
|
# File 'lib/redis_client/cluster/node.rb', line 126
def node_keys
@topology.clients.keys.sort
end
|
#owns_error?(err) ⇒ Boolean
#primary_clients ⇒ Object
165
166
167
|
# File 'lib/redis_client/cluster/node.rb', line 165
def primary_clients
@topology.primary_clients.values
end
|
#reload! ⇒ Object
206
207
208
209
210
211
212
213
214
215
216
217
218
|
# File 'lib/redis_client/cluster/node.rb', line 206
def reload!
with_reload_lock do
with_startup_clients(MAX_STARTUP_SAMPLE) do |startup_clients|
@node_info = refetch_node_info_list(startup_clients)
@node_configs = @node_info.to_h do |node_info|
[node_info.node_key, @config.client_config_for_node(node_info.node_key)]
end
@slots = build_slot_node_mappings(@node_info)
@replications = build_replication_mappings(@node_info)
@topology.process_topology_update!(@replications, @node_configs)
end
end
end
|
#replica_clients ⇒ Object
169
170
171
|
# File 'lib/redis_client/cluster/node.rb', line 169
def replica_clients
@topology.replica_clients.values
end
|
#sample ⇒ Object
122
123
124
|
# File 'lib/redis_client/cluster/node.rb', line 122
def sample
@topology.clients.values.sample
end
|
#send_ping(method, command, args, &block) ⇒ Object
148
149
150
151
152
153
154
155
|
# File 'lib/redis_client/cluster/node.rb', line 148
def send_ping(method, command, args, &block)
result_values, errors = call_multiple_nodes(@topology.clients, method, command, args, &block)
return result_values if errors.nil? || errors.empty?
raise ReloadNeeded if errors.values.any?(::RedisClient::ConnectionError)
raise ::RedisClient::Cluster::ErrorCollection, errors
end
|
#update_slot(slot, node_key) ⇒ Object
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/redis_client/cluster/node.rb', line 195
def update_slot(slot, node_key)
return if @mutex.locked?
@mutex.synchronize do
@slots[slot] = node_key
rescue RangeError
@slots = Array.new(SLOT_SIZE) { |i| @slots[i] }
@slots[slot] = node_key
end
end
|