Class: Mongo::Cluster
- Inherits:
-
Object
- Object
- Mongo::Cluster
- Extended by:
- Forwardable
- Includes:
- Event::Subscriber, Loggable, Monitoring::Publishable
- Defined in:
- lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/app_metadata.rb,
lib/mongo/cluster/cursor_reaper.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/topology/replica_set.rb
Overview
Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.
Defined Under Namespace
Modules: Topology Classes: AppMetadata, CursorReaper
Constant Summary collapse
- MAX_READ_RETRIES =
The default number of mongos read retries.
1- MAX_WRITE_RETRIES =
The default number of mongos write retries.
1- READ_RETRY_INTERVAL =
The default mongos read retry interval, in seconds.
5- IDLE_WRITE_PERIOD_SECONDS =
How often an idle primary writes a no-op to the oplog.
10
Constants included from Loggable
Instance Attribute Summary collapse
-
#app_metadata ⇒ Mongo::Cluster::AppMetadata
readonly
The application metadata, used for connection handshakes.
-
#monitoring ⇒ Monitoring
readonly
Monitoring The monitoring.
-
#options ⇒ Hash
readonly
The options hash.
-
#topology ⇒ Object
readonly
The cluster topology.
Attributes included from Event::Subscriber
Class Method Summary collapse
-
.create(client) ⇒ Cluster
private
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
-
.finalize(pools, cursor_reaper) ⇒ Proc
Finalize the cluster for garbage collection.
Instance Method Summary collapse
-
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object.
-
#add(host) ⇒ Server
Add a server to the cluster with the provided address.
-
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
-
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
-
#disconnect! ⇒ true
Disconnect all servers.
-
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
-
#has_readable_server?(server_selector = nil) ⇒ true, false
Determine if the cluster would select a readable server for the provided read preference.
-
#has_writable_server? ⇒ true, false
Determine if the cluster would select a writable server.
-
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
constructor
private
Instantiate the new cluster.
-
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
-
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
-
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
-
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
-
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
-
#reconnect! ⇒ true
Reconnect all servers.
-
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
-
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
-
#scan! ⇒ true
Force a scan of all known servers in the cluster.
-
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
-
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
Methods included from Loggable
#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger
Methods included from Event::Subscriber
Methods included from Monitoring::Publishable
#publish_command, #publish_event, #publish_sdam_event
Constructor Details
#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Cluster should never be directly instantiated outside of a Client.
Instantiate the new cluster.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/mongo/cluster.rb', line 152 def initialize(seeds, monitoring, = Options::Redacted.new) @addresses = [] @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new = .freeze ||= AppMetadata.new(self) @update_lock = Mutex.new @pool_lock = Mutex.new @topology = Topology.initial(seeds, monitoring, ) publish_sdam_event( Monitoring::TOPOLOGY_OPENING, Monitoring::Event::TopologyOpening.new(@topology) ) subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self)) subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self)) subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self)) seeds.each{ |seed| add(seed) } publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(@topology, @topology) ) if @servers.size > 1 @cursor_reaper = CursorReaper.new @cursor_reaper.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools, @cursor_reaper)) end |
Instance Attribute Details
#app_metadata ⇒ Mongo::Cluster::AppMetadata (readonly)
Returns The application metadata, used for connection handshakes.
64 65 66 |
# File 'lib/mongo/cluster.rb', line 64 def end |
#monitoring ⇒ Monitoring (readonly)
Returns monitoring The monitoring.
55 56 57 |
# File 'lib/mongo/cluster.rb', line 55 def monitoring @monitoring end |
#options ⇒ Hash (readonly)
Returns The options hash.
52 53 54 |
# File 'lib/mongo/cluster.rb', line 52 def end |
#topology ⇒ Object (readonly)
Returns The cluster topology.
58 59 60 |
# File 'lib/mongo/cluster.rb', line 58 def topology @topology end |
Class Method Details
.create(client) ⇒ Cluster
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.
424 425 426 427 428 429 430 431 |
# File 'lib/mongo/cluster.rb', line 424 def self.create(client) cluster = Cluster.new( client.cluster.addresses.map(&:to_s), client.instance_variable_get(:@monitoring).dup, client. ) client.instance_variable_set(:@cluster, cluster) end |
.finalize(pools, cursor_reaper) ⇒ Proc
Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.
197 198 199 200 201 202 203 204 205 |
# File 'lib/mongo/cluster.rb', line 197 def self.finalize(pools, cursor_reaper) proc do begin; cursor_reaper.kill_cursors; rescue; end cursor_reaper.stop! pools.values.each do |pool| pool.disconnect! end end end |
Instance Method Details
#==(other) ⇒ true, false
Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.
81 82 83 84 |
# File 'lib/mongo/cluster.rb', line 81 def ==(other) return false unless other.is_a?(Cluster) addresses == other.addresses && == other. end |
#add(host) ⇒ Server
Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/mongo/cluster.rb', line 98 def add(host) address = Address.new(host) if !addresses.include?(address) if addition_allowed?(address) @update_lock.synchronize { @addresses.push(address) } server = Server.new(address, self, @monitoring, event_listeners, ) @update_lock.synchronize { @servers.push(server) } server end end end |
#add_hosts(description) ⇒ Object
Add hosts in a description to the cluster.
389 390 391 392 393 |
# File 'lib/mongo/cluster.rb', line 389 def add_hosts(description) if topology.add_hosts?(description, servers_list) description.servers.each { |s| add(s) } end end |
#addresses ⇒ Array<Mongo::Address>
The addresses in the cluster.
441 442 443 |
# File 'lib/mongo/cluster.rb', line 441 def addresses addresses_list end |
#disconnect! ⇒ true
Disconnect all servers.
361 362 363 364 365 |
# File 'lib/mongo/cluster.rb', line 361 def disconnect! begin; @cursor_reaper.kill_cursors; rescue; end @cursor_reaper.stop! @servers.each { |server| server.disconnect! } and true end |
#elect_primary!(description) ⇒ Topology
Elect a primary server from the description that has just changed to a primary.
245 246 247 |
# File 'lib/mongo/cluster.rb', line 245 def elect_primary!(description) @topology = topology.elect_primary(description, servers_list) end |
#has_readable_server?(server_selector = nil) ⇒ true, false
Determine if the cluster would select a readable server for the provided read preference.
122 123 124 |
# File 'lib/mongo/cluster.rb', line 122 def has_readable_server?(server_selector = nil) topology.has_readable_server?(self, server_selector) end |
#has_writable_server? ⇒ true, false
Determine if the cluster would select a writable server.
134 135 136 |
# File 'lib/mongo/cluster.rb', line 134 def has_writable_server? topology.has_writable_server?(self) end |
#inspect ⇒ String
Get the nicer formatted string for use in inspection.
215 216 217 |
# File 'lib/mongo/cluster.rb', line 215 def inspect "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>" end |
#max_read_retries ⇒ Integer
Get the maximum number of times the cluster can retry a read operation on a mongos.
258 259 260 |
# File 'lib/mongo/cluster.rb', line 258 def max_read_retries [:max_read_retries] || MAX_READ_RETRIES end |
#next_primary(ping = true) ⇒ Mongo::Server
Get the next primary server we can send an operation to.
229 230 231 232 |
# File 'lib/mongo/cluster.rb', line 229 def next_primary(ping = true) @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY) @primary_selector.select_server(self, ping) end |
#pool(server) ⇒ Server::ConnectionPool
Get the scoped connection pool for the server.
272 273 274 275 276 |
# File 'lib/mongo/cluster.rb', line 272 def pool(server) @pool_lock.synchronize do pools[server.address] ||= Server::ConnectionPool.get(server) end end |
#read_retry_interval ⇒ Float
Get the interval, in seconds, in which a mongos read operation is retried.
287 288 289 |
# File 'lib/mongo/cluster.rb', line 287 def read_retry_interval [:read_retry_interval] || READ_RETRY_INTERVAL end |
#reconnect! ⇒ true
Reconnect all servers.
375 376 377 378 379 |
# File 'lib/mongo/cluster.rb', line 375 def reconnect! scan! servers.each { |server| server.reconnect! } @cursor_reaper.restart! and true end |
#remove(host) ⇒ Object
Remove the server from the cluster for the provided address, if it exists.
313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/mongo/cluster.rb', line 313 def remove(host) address = Address.new(host) removed_servers = @servers.select { |s| s.address == address } @update_lock.synchronize { @servers = @servers - removed_servers } removed_servers.each{ |server| server.disconnect! } if removed_servers publish_sdam_event( Monitoring::SERVER_CLOSED, Monitoring::Event::ServerClosed.new(address, topology) ) @update_lock.synchronize { @addresses.reject! { |addr| addr == address } } end |
#remove_hosts(description) ⇒ Object
Remove hosts in a description from the cluster.
403 404 405 406 407 408 409 |
# File 'lib/mongo/cluster.rb', line 403 def remove_hosts(description) if topology.remove_hosts?(description) servers_list.each do |s| remove(s.address.to_s) if topology.remove_server?(description, s) end end end |
#scan! ⇒ true
This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.
Force a scan of all known servers in the cluster.
336 337 338 |
# File 'lib/mongo/cluster.rb', line 336 def scan! servers_list.each{ |server| server.scan! } and true end |
#servers ⇒ Array<Server>
Get a list of server candidates from the cluster that can have operations executed on them.
349 350 351 |
# File 'lib/mongo/cluster.rb', line 349 def servers topology.servers(servers_list.compact).compact end |
#standalone_discovered ⇒ Topology
Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.
300 301 302 |
# File 'lib/mongo/cluster.rb', line 300 def standalone_discovered @topology = topology.standalone_discovered end |