Class: Mongo::Cluster
- Inherits:
-
Object
- Object
- Mongo::Cluster
- Extended by:
- Forwardable
- Includes:
- Event::Subscriber, Loggable
- Defined in:
- lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/topology/replica_set.rb
Overview
Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.
Defined Under Namespace
Modules: Topology
Constant Summary collapse
- MAX_READ_RETRIES =
The default number of mongos read retries.
1
- READ_RETRY_INTERVAL =
The default mongos read retry interval, in seconds.
5
Constants included from Loggable
Instance Attribute Summary collapse
-
#options ⇒ Hash
readonly
The options hash.
-
#topology ⇒ Object
readonly
The cluster topology.
Attributes included from Event::Subscriber
Class Method Summary collapse
-
.create(client) ⇒ Cluster
private
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
-
.finalize(pools) ⇒ Proc
Finalize the cluster for garbage collection.
Instance Method Summary collapse
-
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object.
-
#add(host) ⇒ Server
Add a server to the cluster with the provided address.
-
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
-
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
-
#disconnect! ⇒ true
Disconnect all servers.
-
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
-
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
constructor
private
Instantiate the new cluster.
-
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
-
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
-
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
-
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
-
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
-
#reconnect! ⇒ true
Reconnect all servers.
-
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
-
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
-
#scan! ⇒ true
Force a scan of all known servers in the cluster.
-
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
-
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Methods included from Event::Subscriber
Constructor Details
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cluster should never be directly instantiated outside of a Client.
Instantiate the new cluster.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/mongo/cluster.rb', line 101 def initialize(seeds, monitoring, = Options::Redacted.new) @addresses = [] @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new @options = .freeze @topology = Topology.initial(seeds, ) @update_lock = Mutex.new @pool_lock = Mutex.new subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self)) seeds.each{ |seed| add(seed) } ObjectSpace.define_finalizer(self, self.class.finalize(pools)) end |
Instance Attribute Details
#options ⇒ Hash (readonly)
Returns The options hash.
39 40 41 |
# File 'lib/mongo/cluster.rb', line 39 def @options end |
#topology ⇒ Object (readonly)
Returns The cluster topology.
42 43 44 |
# File 'lib/mongo/cluster.rb', line 42 def topology @topology end |
Class Method Details
.create(client) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
349 350 351 352 353 354 355 356 |
# File 'lib/mongo/cluster.rb', line 349 def self.create(client) cluster = Cluster.new( client.cluster.addresses.map(&:to_s), client.instance_variable_get(:@monitoring).dup, client. ) client.instance_variable_set(:@cluster, cluster) end |
.finalize(pools) ⇒ Proc
Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.
131 132 133 134 135 136 137 |
# File 'lib/mongo/cluster.rb', line 131 def self.finalize(pools) proc do pools.values.each do |pool| pool.disconnect! end end end |
Instance Method Details
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.
57 58 59 60 |
# File 'lib/mongo/cluster.rb', line 57 def ==(other) return false unless other.is_a?(Cluster) addresses == other.addresses && == other. end |
#add(host) ⇒ Server
Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/mongo/cluster.rb', line 74 def add(host) address = Address.new(host) if !addresses.include?(address) if addition_allowed?(address) log_debug("Adding #{address.to_s} to the cluster.") @update_lock.synchronize { @addresses.push(address) } server = Server.new(address, self, @monitoring, event_listeners, ) @update_lock.synchronize { @servers.push(server) } server end end end |
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
314 315 316 317 318 |
# File 'lib/mongo/cluster.rb', line 314 def add_hosts(description) if topology.add_hosts?(description, servers_list) description.servers.each { |s| add(s) } end end |
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
366 367 368 |
# File 'lib/mongo/cluster.rb', line 366 def addresses addresses_list end |
#disconnect! ⇒ true
Disconnect all servers.
289 290 291 |
# File 'lib/mongo/cluster.rb', line 289 def disconnect! @servers.each { |server| server.disconnect! } and true end |
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
176 177 178 |
# File 'lib/mongo/cluster.rb', line 176 def elect_primary!(description) @topology = topology.elect_primary(description, servers_list) end |
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
147 148 149 |
# File 'lib/mongo/cluster.rb', line 147 def inspect "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>" end |
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
189 190 191 |
# File 'lib/mongo/cluster.rb', line 189 def max_read_retries [:max_read_retries] || MAX_READ_RETRIES end |
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
161 162 163 |
# File 'lib/mongo/cluster.rb', line 161 def next_primary(ping = true) ServerSelector.get(ServerSelector::PRIMARY.merge()).select_server(self, ping) end |
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
203 204 205 206 207 |
# File 'lib/mongo/cluster.rb', line 203 def pool(server) @pool_lock.synchronize do pools[server.address] ||= Server::ConnectionPool.get(server) end end |
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
218 219 220 |
# File 'lib/mongo/cluster.rb', line 218 def read_retry_interval [:read_retry_interval] || READ_RETRY_INTERVAL end |
#reconnect! ⇒ true
Reconnect all servers.
301 302 303 304 |
# File 'lib/mongo/cluster.rb', line 301 def reconnect! scan! servers.each { |server| server.reconnect! } and true end |
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
244 245 246 247 248 249 250 251 |
# File 'lib/mongo/cluster.rb', line 244 def remove(host) log_debug("#{host} being removed from the cluster.") address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end |
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
328 329 330 331 332 333 334 |
# File 'lib/mongo/cluster.rb', line 328 def remove_hosts(description) if topology.remove_hosts?(description) servers_list.each do |s| remove(s.address.to_s) if topology.remove_server?(description, s) end end end |
#scan! ⇒ true
This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.
Force a scan of all known servers in the cluster.
264 265 266 |
# File 'lib/mongo/cluster.rb', line 264 def scan! servers_list.each{ |server| server.scan! } and true end |
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
277 278 279 |
# File 'lib/mongo/cluster.rb', line 277 def servers topology.servers(servers_list.compact).compact end |
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
231 232 233 |
# File 'lib/mongo/cluster.rb', line 231 def standalone_discovered @topology = topology.standalone_discovered end |