Class: Mongo::Cluster

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Event::Subscriber, Loggable, Monitoring::Publishable
Defined in:
lib/mongo/cluster.rb,
lib/mongo/cluster/topology.rb,
lib/mongo/cluster/app_metadata.rb,
lib/mongo/cluster/cursor_reaper.rb,
lib/mongo/cluster/topology/single.rb,
lib/mongo/cluster/topology/sharded.rb,
lib/mongo/cluster/topology/unknown.rb,
lib/mongo/cluster/topology/replica_set.rb

Overview

Represents a group of servers on the server side, either as a single server, a replica set, or a single or multiple mongos.

Since:

  • 2.0.0

Defined Under Namespace

Modules: Topology Classes: AppMetadata, CursorReaper

Constant Summary collapse

MAX_READ_RETRIES =

The default number of mongos read retries.

Since:

  • 2.1.1

1
READ_RETRY_INTERVAL =

The default mongos read retry interval, in seconds.

Since:

  • 2.1.1

5
IDLE_WRITE_PERIOD_SECONDS =

How often an idle primary writes a no-op to the oplog.

Since:

  • 2.4.0

10

Constants included from Loggable

Loggable::PREFIX

Instance Attribute Summary collapse

Attributes included from Event::Subscriber

#event_listeners

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn, #logger

Methods included from Event::Subscriber

#subscribe_to

Methods included from Monitoring::Publishable

#publish_command, #publish_event, #publish_sdam_event

Constructor Details

#initialize(seeds, monitoring, options = Options::Redacted.new) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Cluster should never be directly instantiated outside of a Client.

Instantiate the new cluster.

Examples:

Instantiate the cluster.

Mongo::Cluster.new(["127.0.0.1:27017"], monitoring)

Parameters:

  • seeds (Array<String>)

    The addresses of the configured servers.

  • monitoring (Monitoring)

    The monitoring.

  • options (Hash) (defaults to: Options::Redacted.new)

    The options.

Since:

  • 2.0.0



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/mongo/cluster.rb', line 147

def initialize(seeds, monitoring, options = Options::Redacted.new)
  @addresses = []
  @servers = []
  @monitoring = monitoring
  @event_listeners = Event::Listeners.new
  @options = options.freeze
  @app_metadata ||= AppMetadata.new(self)
  @update_lock = Mutex.new
  @pool_lock = Mutex.new
  @topology = Topology.initial(seeds, monitoring, options)

  publish_sdam_event(
    Monitoring::TOPOLOGY_OPENING,
    Monitoring::Event::TopologyOpening.new(@topology)
  )

  subscribe_to(Event::STANDALONE_DISCOVERED, Event::StandaloneDiscovered.new(self))
  subscribe_to(Event::DESCRIPTION_CHANGED, Event::DescriptionChanged.new(self))
  subscribe_to(Event::MEMBER_DISCOVERED, Event::MemberDiscovered.new(self))

  seeds.each{ |seed| add(seed) }

  publish_sdam_event(
    Monitoring::TOPOLOGY_CHANGED,
    Monitoring::Event::TopologyChanged.new(@topology, @topology)
  ) if @servers.size > 1

  @cursor_reaper = CursorReaper.new
  @cursor_reaper.run!

  ObjectSpace.define_finalizer(self, self.class.finalize(pools))
end

Instance Attribute Details

#app_metadataMongo::Cluster::AppMetadata (readonly)

Returns The application metadata, used for connection handshakes.

Returns:

Since:

  • 2.4.0



59
60
61
# File 'lib/mongo/cluster.rb', line 59

def 
  @app_metadata
end

#monitoringMonitoring (readonly)

Returns monitoring The monitoring.

Returns:

Since:

  • 2.0.0



50
51
52
# File 'lib/mongo/cluster.rb', line 50

def monitoring
  @monitoring
end

#optionsHash (readonly)

Returns The options hash.

Returns:

  • (Hash)

    The options hash.

Since:

  • 2.0.0



47
48
49
# File 'lib/mongo/cluster.rb', line 47

def options
  @options
end

#topologyObject (readonly)

Returns The cluster topology.

Returns:

  • (Object)

    The cluster topology.

Since:

  • 2.0.0



53
54
55
# File 'lib/mongo/cluster.rb', line 53

def topology
  @topology
end

Class Method Details

.create(client) ⇒ Cluster

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a cluster for the provided client, for use when we don’t want the client’s original cluster instance to be the same.

Examples:

Create a cluster for the client.

Cluster.create(client)

Parameters:

  • client (Client)

    The client to create on.

Returns:

Since:

  • 2.0.0



419
420
421
422
423
424
425
426
# File 'lib/mongo/cluster.rb', line 419

def self.create(client)
  cluster = Cluster.new(
    client.cluster.addresses.map(&:to_s),
    client.instance_variable_get(:@monitoring).dup,
    client.options
  )
  client.instance_variable_set(:@cluster, cluster)
end

.finalize(pools) ⇒ Proc

Finalize the cluster for garbage collection. Disconnects all the scoped connection pools.

Examples:

Finalize the cluster.

Cluster.finalize(pools)

Parameters:

Returns:

  • (Proc)

    The Finalizer.

Since:

  • 2.2.0



192
193
194
195
196
197
198
199
200
# File 'lib/mongo/cluster.rb', line 192

def self.finalize(pools)
  proc do
    begin; @cursor_reaper.kill_cursors; rescue; end
    @cursor_reaper.stop!
    pools.values.each do |pool|
      pool.disconnect!
    end
  end
end

Instance Method Details

#==(other) ⇒ true, false

Determine if this cluster of servers is equal to another object. Checks the servers currently in the cluster, not what was configured.

Examples:

Is the cluster equal to the object?

cluster == other

Parameters:

  • other (Object)

    The object to compare to.

Returns:

  • (true, false)

    If the objects are equal.

Since:

  • 2.0.0



76
77
78
79
# File 'lib/mongo/cluster.rb', line 76

def ==(other)
  return false unless other.is_a?(Cluster)
  addresses == other.addresses && options == other.options
end

#add(host) ⇒ Server

Add a server to the cluster with the provided address. Useful in auto-discovery of new servers when an existing server executes an ismaster and potentially non-configured servers were included.

Examples:

Add the server for the address to the cluster.

cluster.add('127.0.0.1:27018')

Parameters:

  • host (String)

    The address of the server to add.

Returns:

  • (Server)

    The newly added server, if not present already.

Since:

  • 2.0.0



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/mongo/cluster.rb', line 93

def add(host)
  address = Address.new(host)
  if !addresses.include?(address)
    if addition_allowed?(address)
      @update_lock.synchronize { @addresses.push(address) }
      server = Server.new(address, self, @monitoring, event_listeners, options)
      @update_lock.synchronize { @servers.push(server) }
      server
    end
  end
end

#add_hosts(description) ⇒ Object

Add hosts in a description to the cluster.

Examples:

Add hosts in a description to the cluster.

cluster.add_hosts(description)

Parameters:

Since:

  • 2.0.6



384
385
386
387
388
# File 'lib/mongo/cluster.rb', line 384

def add_hosts(description)
  if topology.add_hosts?(description, servers_list)
    description.servers.each { |s| add(s) }
  end
end

#addressesArray<Mongo::Address>

The addresses in the cluster.

Examples:

Get the addresses in the cluster.

cluster.addresses

Returns:

Since:

  • 2.0.6



436
437
438
# File 'lib/mongo/cluster.rb', line 436

def addresses
  addresses_list
end

#disconnect!true

Disconnect all servers.

Examples:

Disconnect the cluster’s servers.

cluster.disconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



356
357
358
359
360
# File 'lib/mongo/cluster.rb', line 356

def disconnect!
  begin; @cursor_reaper.kill_cursors; rescue; end
  @cursor_reaper.stop!
  @servers.each { |server| server.disconnect! } and true
end

#elect_primary!(description) ⇒ Topology

Elect a primary server from the description that has just changed to a primary.

Examples:

Elect a primary server.

cluster.elect_primary!(description)

Parameters:

Returns:

Since:

  • 2.0.0



240
241
242
# File 'lib/mongo/cluster.rb', line 240

def elect_primary!(description)
  @topology = topology.elect_primary(description, servers_list)
end

#has_readable_server?(server_selector = nil) ⇒ true, false

Determine if the cluster would select a readable server for the provided read preference.

Examples:

Is a readable server present?

topology.has_readable_server?(server_selector)

Parameters:

  • server_selector (ServerSelector) (defaults to: nil)

    The server selector.

Returns:

  • (true, false)

    If a readable server is present.

Since:

  • 2.4.0



117
118
119
# File 'lib/mongo/cluster.rb', line 117

def has_readable_server?(server_selector = nil)
  topology.has_readable_server?(self, server_selector)
end

#has_writable_server?true, false

Determine if the cluster would select a writable server.

Examples:

Is a writable server present?

topology.has_writable_server?

Returns:

  • (true, false)

    If a writable server is present.

Since:

  • 2.4.0



129
130
131
# File 'lib/mongo/cluster.rb', line 129

def has_writable_server?
  topology.has_writable_server?(self)
end

#inspectString

Get the nicer formatted string for use in inspection.

Examples:

Inspect the cluster.

cluster.inspect

Returns:

  • (String)

    The cluster inspection.

Since:

  • 2.0.0



210
211
212
# File 'lib/mongo/cluster.rb', line 210

def inspect
  "#<Mongo::Cluster:0x#{object_id} servers=#{servers} topology=#{topology.display_name}>"
end

#max_read_retriesInteger

Get the maximum number of times the cluster can retry a read operation on a mongos.

Examples:

Get the max read retries.

cluster.max_read_retries

Returns:

  • (Integer)

    The maximum retries.

Since:

  • 2.1.1



253
254
255
# File 'lib/mongo/cluster.rb', line 253

def max_read_retries
  options[:max_read_retries] || MAX_READ_RETRIES
end

#next_primary(ping = true) ⇒ Mongo::Server

Get the next primary server we can send an operation to.

Examples:

Get the next primary server.

cluster.next_primary

Parameters:

  • ping (true, false) (defaults to: true)

    Whether to ping the server before selection.

Returns:

Since:

  • 2.0.0



224
225
226
227
# File 'lib/mongo/cluster.rb', line 224

def next_primary(ping = true)
  @primary_selector ||= ServerSelector.get(ServerSelector::PRIMARY)
  @primary_selector.select_server(self, ping)
end

#pool(server) ⇒ Server::ConnectionPool

Get the scoped connection pool for the server.

Examples:

Get the connection pool.

cluster.pool(server)

Parameters:

  • server (Server)

    The server.

Returns:

Since:

  • 2.2.0



267
268
269
270
271
# File 'lib/mongo/cluster.rb', line 267

def pool(server)
  @pool_lock.synchronize do
    pools[server.address] ||= Server::ConnectionPool.get(server)
  end
end

#read_retry_intervalFloat

Get the interval, in seconds, in which a mongos read operation is retried.

Examples:

Get the read retry interval.

cluster.read_retry_interval

Returns:

  • (Float)

    The interval.

Since:

  • 2.1.1



282
283
284
# File 'lib/mongo/cluster.rb', line 282

def read_retry_interval
  options[:read_retry_interval] || READ_RETRY_INTERVAL
end

#reconnect!true

Reconnect all servers.

Examples:

Reconnect the cluster’s servers.

cluster.reconnect!

Returns:

  • (true)

    Always true.

Since:

  • 2.1.0



370
371
372
373
374
# File 'lib/mongo/cluster.rb', line 370

def reconnect!
  scan!
  servers.each { |server| server.reconnect! }
  @cursor_reaper.restart! and true
end

#remove(host) ⇒ Object

Remove the server from the cluster for the provided address, if it exists.

Examples:

Remove the server from the cluster.

server.remove('127.0.0.1:27017')

Parameters:

  • host (String)

    The host/port or socket address.

Since:

  • 2.0.0



308
309
310
311
312
313
314
315
316
317
318
# File 'lib/mongo/cluster.rb', line 308

def remove(host)
  address = Address.new(host)
  removed_servers = @servers.select { |s| s.address == address }
  @update_lock.synchronize { @servers = @servers - removed_servers }
  removed_servers.each{ |server| server.disconnect! } if removed_servers
  publish_sdam_event(
    Monitoring::SERVER_CLOSED,
    Monitoring::Event::ServerClosed.new(address, topology)
  )
  @update_lock.synchronize { @addresses.reject! { |addr| addr == address } }
end

#remove_hosts(description) ⇒ Object

Remove hosts in a description from the cluster.

Examples:

Remove hosts in a description from the cluster.

cluster.remove_hosts(description)

Parameters:

Since:

  • 2.0.6



398
399
400
401
402
403
404
# File 'lib/mongo/cluster.rb', line 398

def remove_hosts(description)
  if topology.remove_hosts?(description)
    servers_list.each do |s|
      remove(s.address.to_s) if topology.remove_server?(description, s)
    end
  end
end

#scan!true

Note:

This operation is done synchronously. If servers in the cluster are down or slow to respond this can potentially be a slow operation.

Force a scan of all known servers in the cluster.

Examples:

Force a full cluster scan.

cluster.scan!

Returns:

  • (true)

    Always true.

Since:

  • 2.0.0



331
332
333
# File 'lib/mongo/cluster.rb', line 331

def scan!
  servers_list.each{ |server| server.scan! } and true
end

#serversArray<Server>

Get a list of server candidates from the cluster that can have operations executed on them.

Examples:

Get the server candidates for an operation.

cluster.servers

Returns:

  • (Array<Server>)

    The candidate servers.

Since:

  • 2.0.0



344
345
346
# File 'lib/mongo/cluster.rb', line 344

def servers
  topology.servers(servers_list.compact).compact
end

#standalone_discoveredTopology

Notify the cluster that a standalone server was discovered so that the topology can be updated accordingly.

Examples:

Notify the cluster that a standalone server was discovered.

cluster.standalone_discovered

Returns:

Since:

  • 2.0.6



295
296
297
# File 'lib/mongo/cluster.rb', line 295

def standalone_discovered
  @topology = topology.standalone_discovered
end