Class: Mongo::Cluster
- Inherits:
-
Object
- Object
- Mongo::Cluster
- Extended by:
- Forwardable
- Includes:
- Event::Subscriber, Loggable, Monitoring::Publishable
- Defined in:
- lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/app_metadata.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/periodic_executor.rb,
lib/mongo/cluster/topology/replica_set.rb,
lib/mongo/cluster/reapers/cursor_reaper.rb,
lib/mongo/cluster/reapers/socket_reaper.rb
Overview
Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.
Defined Under Namespace
Modules: Topology Classes: AppMetadata, CursorReaper, PeriodicExecutor, SocketReaper
Constant Summary collapse
- MAX_READ_RETRIES =
The default number of mongos read retries.
1- MAX_WRITE_RETRIES =
The default number of mongos write retries.
1- READ_RETRY_INTERVAL =
The default mongos read retry interval, in seconds.
5- IDLE_WRITE_PERIOD_SECONDS =
How often an idle primary writes a no-op to the oplog.
10- CLUSTER_TIME =
The cluster time key in responses from mongos servers.
'clusterTime'.freeze
Constants included from Loggable
Instance Attribute Summary collapse
-
#app_metadata ⇒ Mongo::Cluster::AppMetadata
readonly
The application metadata, used for connection handshakes.
-
#cluster_time ⇒ BSON::Document
readonly
The latest cluster time seen.
-
#monitoring ⇒ Monitoring
readonly
Monitoring The monitoring.
-
#options ⇒ Hash
readonly
The options hash.
- #session_pool ⇒ Object readonly
-
#topology ⇒ Object
readonly
The cluster topology.
Attributes included from Event::Subscriber
Class Method Summary collapse
-
.create(client) ⇒ Cluster
private
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
-
.finalize(pools, periodic_executor, session_pool) ⇒ Proc
Finalize the cluster for garbage collection.
Instance Method Summary collapse
-
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object.
-
#add(host) ⇒ Server
Add a server to the cluster with the provided address.
-
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
-
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
-
#disconnect! ⇒ true
Disconnect all servers.
-
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
-
#has_readable_server?(server_selector = nil) ⇒ true, false
Determine if the cluster would select a readable server for the provided read preference.
-
#has_writable_server? ⇒ true, false
Determine if the cluster would select a writable server.
-
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
constructor
private
Instantiate the new cluster.
-
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
-
#logical_session_timeout ⇒ Integer?
The logical session timeout value in minutes.
-
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
-
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
-
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
-
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
-
#reconnect! ⇒ true
Reconnect all servers.
-
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
-
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
-
#scan! ⇒ true
Force a scan of all known servers in the cluster.
-
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
-
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
-
#update_cluster_time(result) ⇒ Object
Update the max cluster time seen in a response.
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Methods included from Event::Subscriber
Methods included from Monitoring::Publishable
#publish_command, #publish_event, #publish_sdam_event
Constructor Details
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cluster should never be directly instantiated outside of a Client.
When connecting to a mongodb+srv:// URI, the client expands such a URI into a list of servers and passes that list to the Cluster constructor. When connecting to a standalone mongod, the Cluster constructor receives the corresponding address as an array of one string.
Instantiate the new cluster.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/mongo/cluster.rb', line 176 def initialize(seeds, monitoring, = Options::Redacted.new) @addresses = [] @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new @options = .freeze @app_metadata = AppMetadata.new(self) @update_lock = Mutex.new @pool_lock = Mutex.new @cluster_time = nil @cluster_time_lock = Mutex.new @topology = Topology.initial(seeds, monitoring, ) Session::SessionPool.create(self) publish_sdam_event( Monitoring::TOPOLOGY_OPENING, Monitoring::Event::TopologyOpening.new(@topology) ) subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self)) seeds.each{ |seed| add(seed) } publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(@topology, @topology) ) if @servers.size > 1 @cursor_reaper = CursorReaper.new @socket_reaper = SocketReaper.new(self) @periodic_executor = PeriodicExecutor.new(@cursor_reaper, @socket_reaper) @periodic_executor.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools, @periodic_executor, @session_pool)) end |
Instance Attribute Details
#app_metadata ⇒ Mongo::Cluster::AppMetadata (readonly)
Returns The application metadata, used for connection handshakes.
71 72 73 |
# File 'lib/mongo/cluster.rb', line 71 def @app_metadata end |
#cluster_time ⇒ BSON::Document (readonly)
Returns The latest cluster time seen.
76 77 78 |
# File 'lib/mongo/cluster.rb', line 76 def cluster_time @cluster_time end |
#monitoring ⇒ Monitoring (readonly)
Returns monitoring The monitoring.
62 63 64 |
# File 'lib/mongo/cluster.rb', line 62 def monitoring @monitoring end |
#options ⇒ Hash (readonly)
Returns The options hash.
59 60 61 |
# File 'lib/mongo/cluster.rb', line 59 def @options end |
#session_pool ⇒ Object (readonly)
81 82 83 |
# File 'lib/mongo/cluster.rb', line 81 def session_pool @session_pool end |
#topology ⇒ Object (readonly)
Returns The cluster topology.
65 66 67 |
# File 'lib/mongo/cluster.rb', line 65 def topology @topology end |
Class Method Details
.create(client) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
455 456 457 458 459 460 461 462 |
# File 'lib/mongo/cluster.rb', line 455 def self.create(client) cluster = Cluster.new( client.cluster.addresses.map(&:to_s), Monitoring.new, client. ) client.instance_variable_set(:@cluster, cluster) end |
.finalize(pools, periodic_executor, session_pool) ⇒ Proc
Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.
227 228 229 230 231 232 233 234 235 |
# File 'lib/mongo/cluster.rb', line 227 def self.finalize(pools, periodic_executor, session_pool) proc do session_pool.end_sessions periodic_executor.stop! pools.values.each do |pool| pool.disconnect! end end end |
Instance Method Details
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.
98 99 100 101 |
# File 'lib/mongo/cluster.rb', line 98 def ==(other) return false unless other.is_a?(Cluster) addresses == other.addresses && == other. end |
#add(host) ⇒ Server
Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/mongo/cluster.rb', line 115 def add(host) address = Address.new(host, ) if !addresses.include?(address) if addition_allowed?(address) @update_lock.synchronize { @addresses.push(address) } server = Server.new(address, self, @monitoring, event_listeners, ) @update_lock.synchronize { @servers.push(server) } server end end end |
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
420 421 422 423 424 |
# File 'lib/mongo/cluster.rb', line 420 def add_hosts(description) if topology.add_hosts?(description, servers_list) description.servers.each { |s| add(s) } end end |
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
472 473 474 |
# File 'lib/mongo/cluster.rb', line 472 def addresses addresses_list end |
#disconnect! ⇒ true
Disconnect all servers.
393 394 395 396 |
# File 'lib/mongo/cluster.rb', line 393 def disconnect! @periodic_executor.stop! @servers.each { |server| server.disconnect! } and true end |
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
277 278 279 |
# File 'lib/mongo/cluster.rb', line 277 def elect_primary!(description) @topology = topology.elect_primary(description, servers_list) end |
#has_readable_server?(server_selector = nil) ⇒ true, false
Determine if the cluster would select a readable server for the provided read preference.
139 140 141 |
# File 'lib/mongo/cluster.rb', line 139 def has_readable_server?(server_selector = nil) topology.has_readable_server?(self, server_selector) end |
#has_writable_server? ⇒ true, false
Determine if the cluster would select a writable server.
151 152 153 |
# File 'lib/mongo/cluster.rb', line 151 def has_writable_server? topology.has_writable_server?(self) end |
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
245 246 247 |
# File 'lib/mongo/cluster.rb', line 245 def inspect "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>" end |
#logical_session_timeout ⇒ Integer?
The logical session timeout value in minutes.
484 485 486 487 488 489 |
# File 'lib/mongo/cluster.rb', line 484 def logical_session_timeout servers.inject(nil) do |min, server| break unless timeout = server.logical_session_timeout [timeout, (min || timeout)].min end end |
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
290 291 292 |
# File 'lib/mongo/cluster.rb', line 290 def max_read_retries [:max_read_retries] || MAX_READ_RETRIES end |
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
261 262 263 264 |
# File 'lib/mongo/cluster.rb', line 261 def next_primary(ping = true) @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) @primary_selector.select_server(self) end |
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
304 305 306 307 308 |
# File 'lib/mongo/cluster.rb', line 304 def pool(server) @pool_lock.synchronize do pools[server.address] ||= Server::ConnectionPool.get(server) end end |
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
319 320 321 |
# File 'lib/mongo/cluster.rb', line 319 def read_retry_interval [:read_retry_interval] || READ_RETRY_INTERVAL end |
#reconnect! ⇒ true
Reconnect all servers.
406 407 408 409 410 |
# File 'lib/mongo/cluster.rb', line 406 def reconnect! scan! servers.each { |server| server.reconnect! } @periodic_executor.restart! and true end |
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/mongo/cluster.rb', line 345 def remove(host) address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers publish_sdam_event( Monitoring::SERVER_CLOSED, Monitoring::Event::ServerClosed.new(address, topology) ) @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end |
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
434 435 436 437 438 439 440 |
# File 'lib/mongo/cluster.rb', line 434 def remove_hosts(description) if topology.remove_hosts?(description) servers_list.each do |s| remove(s.address.to_s) if topology.remove_server?(description, s) end end end |
#scan! ⇒ true
This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.
Force a scan of all known servers in the cluster.
368 369 370 |
# File 'lib/mongo/cluster.rb', line 368 def scan! servers_list.each{ |server| server.scan! } and true end |
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
381 382 383 |
# File 'lib/mongo/cluster.rb', line 381 def servers topology.servers(servers_list.compact).compact end |
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
332 333 334 |
# File 'lib/mongo/cluster.rb', line 332 def standalone_discovered @topology = topology.standalone_discovered end |
#update_cluster_time(result) ⇒ Object
Update the max cluster time seen in a response.
501 502 503 504 505 506 507 508 509 510 511 |
# File 'lib/mongo/cluster.rb', line 501 def update_cluster_time(result) if cluster_time_doc = result.cluster_time @cluster_time_lock.synchronize do if @cluster_time.nil? @cluster_time = cluster_time_doc elsif cluster_time_doc[CLUSTER_TIME] > @cluster_time[CLUSTER_TIME] @cluster_time = cluster_time_doc end end end end |