Class: Mongo::Cluster
- Inherits:
-
Object
- Object
- Mongo::Cluster
- Extended by:
- Forwardable
- Includes:
- Event::Subscriber, Loggable, Monitoring::Publishable
- Defined in:
- lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/app_metadata.rb,
lib/mongo/cluster/cursor_reaper.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/topology/replica_set.rb
Overview
Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.
Defined Under Namespace
Modules: Topology Classes: AppMetadata, CursorReaper
Constant Summary collapse
- MAX_READ_RETRIES =
The default number of mongos read retries.
1- READ_RETRY_INTERVAL =
The default mongos read retry interval, in seconds.
5- IDLE_WRITE_PERIOD_SECONDS =
How often an idle primary writes a no-op to the oplog.
10
Constants included from Loggable
Instance Attribute Summary collapse
-
#app_metadata ⇒ Mongo::Cluster::AppMetadata
readonly
The application metadata, used for connection handshakes.
-
#monitoring ⇒ Monitoring
readonly
Monitoring The monitoring.
-
#options ⇒ Hash
readonly
The options hash.
-
#topology ⇒ Object
readonly
The cluster topology.
Attributes included from Event::Subscriber
Class Method Summary collapse
-
.create(client) ⇒ Cluster
private
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
-
.finalize(pools) ⇒ Proc
Finalize the cluster for garbage collection.
Instance Method Summary collapse
-
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object.
-
#add(host) ⇒ Server
Add a server to the cluster with the provided address.
-
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
-
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
-
#disconnect! ⇒ true
Disconnect all servers.
-
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
-
#has_readable_server?(server_selector = nil) ⇒ true, false
Determine if the cluster would select a readable server for the provided read preference.
-
#has_writable_server? ⇒ true, false
Determine if the cluster would select a writable server.
-
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
constructor
private
Instantiate the new cluster.
-
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
-
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
-
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
-
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
-
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
-
#reconnect! ⇒ true
Reconnect all servers.
-
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
-
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
-
#scan! ⇒ true
Force a scan of all known servers in the cluster.
-
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
-
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Methods included from Event::Subscriber
Methods included from Monitoring::Publishable
#publish_command, #publish_event, #publish_sdam_event
Constructor Details
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cluster should never be directly instantiated outside of a Client.
Instantiate the new cluster.
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/mongo/cluster.rb', line 147 def initialize(seeds, monitoring, = Options::Redacted.new) @addresses = [] @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new @options = .freeze @app_metadata ||= AppMetadata.new(self) @update_lock = Mutex.new @pool_lock = Mutex.new @topology = Topology.initial(seeds, monitoring, ) publish_sdam_event( Monitoring::TOPOLOGY_OPENING, Monitoring::Event::TopologyOpening.new(@topology) ) subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self)) seeds.each{ |seed| add(seed) } publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(@topology, @topology) ) if @servers.size > 1 @cursor_reaper = CursorReaper.new @cursor_reaper.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools)) end |
Instance Attribute Details
#app_metadata ⇒ Mongo::Cluster::AppMetadata (readonly)
Returns The application metadata, used for connection handshakes.
59 60 61 |
# File 'lib/mongo/cluster.rb', line 59 def @app_metadata end |
#monitoring ⇒ Monitoring (readonly)
Returns monitoring The monitoring.
50 51 52 |
# File 'lib/mongo/cluster.rb', line 50 def monitoring @monitoring end |
#options ⇒ Hash (readonly)
Returns The options hash.
47 48 49 |
# File 'lib/mongo/cluster.rb', line 47 def @options end |
#topology ⇒ Object (readonly)
Returns The cluster topology.
53 54 55 |
# File 'lib/mongo/cluster.rb', line 53 def topology @topology end |
Class Method Details
.create(client) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
419 420 421 422 423 424 425 426 |
# File 'lib/mongo/cluster.rb', line 419 def self.create(client) cluster = Cluster.new( client.cluster.addresses.map(&:to_s), client.instance_variable_get(:@monitoring).dup, client. ) client.instance_variable_set(:@cluster, cluster) end |
.finalize(pools) ⇒ Proc
Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.
192 193 194 195 196 197 198 199 200 |
# File 'lib/mongo/cluster.rb', line 192 def self.finalize(pools) proc do begin; @cursor_reaper.kill_cursors; rescue; end @cursor_reaper.stop! pools.values.each do |pool| pool.disconnect! end end end |
Instance Method Details
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.
76 77 78 79 |
# File 'lib/mongo/cluster.rb', line 76 def ==(other) return false unless other.is_a?(Cluster) addresses == other.addresses && == other. end |
#add(host) ⇒ Server
Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/mongo/cluster.rb', line 93 def add(host) address = Address.new(host) if !addresses.include?(address) if addition_allowed?(address) @update_lock.synchronize { @addresses.push(address) } server = Server.new(address, self, @monitoring, event_listeners, ) @update_lock.synchronize { @servers.push(server) } server end end end |
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
384 385 386 387 388 |
# File 'lib/mongo/cluster.rb', line 384 def add_hosts(description) if topology.add_hosts?(description, servers_list) description.servers.each { |s| add(s) } end end |
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
436 437 438 |
# File 'lib/mongo/cluster.rb', line 436 def addresses addresses_list end |
#disconnect! ⇒ true
Disconnect all servers.
356 357 358 359 360 |
# File 'lib/mongo/cluster.rb', line 356 def disconnect! begin; @cursor_reaper.kill_cursors; rescue; end @cursor_reaper.stop! @servers.each { |server| server.disconnect! } and true end |
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
240 241 242 |
# File 'lib/mongo/cluster.rb', line 240 def elect_primary!(description) @topology = topology.elect_primary(description, servers_list) end |
#has_readable_server?(server_selector = nil) ⇒ true, false
Determine if the cluster would select a readable server for the provided read preference.
117 118 119 |
# File 'lib/mongo/cluster.rb', line 117 def has_readable_server?(server_selector = nil) topology.has_readable_server?(self, server_selector) end |
#has_writable_server? ⇒ true, false
Determine if the cluster would select a writable server.
129 130 131 |
# File 'lib/mongo/cluster.rb', line 129 def has_writable_server? topology.has_writable_server?(self) end |
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
210 211 212 |
# File 'lib/mongo/cluster.rb', line 210 def inspect "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>" end |
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
253 254 255 |
# File 'lib/mongo/cluster.rb', line 253 def max_read_retries [:max_read_retries] || MAX_READ_RETRIES end |
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
224 225 226 227 |
# File 'lib/mongo/cluster.rb', line 224 def next_primary(ping = true) @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) @primary_selector.select_server(self, ping) end |
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
267 268 269 270 271 |
# File 'lib/mongo/cluster.rb', line 267 def pool(server) @pool_lock.synchronize do pools[server.address] ||= Server::ConnectionPool.get(server) end end |
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
282 283 284 |
# File 'lib/mongo/cluster.rb', line 282 def read_retry_interval [:read_retry_interval] || READ_RETRY_INTERVAL end |
#reconnect! ⇒ true
Reconnect all servers.
370 371 372 373 374 |
# File 'lib/mongo/cluster.rb', line 370 def reconnect! scan! servers.each { |server| server.reconnect! } @cursor_reaper.restart! and true end |
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/mongo/cluster.rb', line 308 def remove(host) address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers publish_sdam_event( Monitoring::SERVER_CLOSED, Monitoring::Event::ServerClosed.new(address, topology) ) @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end |
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
398 399 400 401 402 403 404 |
# File 'lib/mongo/cluster.rb', line 398 def remove_hosts(description) if topology.remove_hosts?(description) servers_list.each do |s| remove(s.address.to_s) if topology.remove_server?(description, s) end end end |
#scan! ⇒ true
This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.
Force a scan of all known servers in the cluster.
331 332 333 |
# File 'lib/mongo/cluster.rb', line 331 def scan! servers_list.each{ |server| server.scan! } and true end |
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
344 345 346 |
# File 'lib/mongo/cluster.rb', line 344 def servers topology.servers(servers_list.compact).compact end |
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
295 296 297 |
# File 'lib/mongo/cluster.rb', line 295 def standalone_discovered @topology = topology.standalone_discovered end |