Class: Mongo::MongoReplicaSetClient
- Inherits:
-
MongoClient
- Object
- MongoClient
- Mongo::MongoReplicaSetClient
- Includes:
- ReadPreference, ThreadLocalVariableManager
- Defined in:
- lib/mongo/mongo_replica_set_client.rb
Overview
Instantiates and manages connections to a MongoDB replica set.
Direct Known Subclasses
Constant Summary collapse
- REPL_SET_OPTS =
[ :refresh_mode, :refresh_interval, :read_secondary, :rs_name, :name ]
Constants included from ReadPreference
ReadPreference::MONGOS_MODES, ReadPreference::READ_PREFERENCES
Constants inherited from MongoClient
Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::READ_PREFERENCE_OPTS, Mongo::MongoClient::SSL_OPTS, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS
Constants included from Networking
Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE
Instance Attribute Summary collapse
-
#manager ⇒ Object
readonly
Returns the value of attribute manager.
-
#refresh_interval ⇒ Object
readonly
Returns the value of attribute refresh_interval.
-
#refresh_mode ⇒ Object
readonly
Returns the value of attribute refresh_mode.
-
#refresh_version ⇒ Object
readonly
Returns the value of attribute refresh_version.
-
#replica_set_name ⇒ Object
readonly
Returns the value of attribute replica_set_name.
-
#seeds ⇒ Object
readonly
Returns the value of attribute seeds.
Attributes inherited from MongoClient
#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #read, #size, #socket_class, #socket_opts, #tag_sets, #write_concern
Attributes included from WriteConcern
Instance Method Summary collapse
- #arbiters ⇒ Object
- #authenticate_pools ⇒ Object
-
#checkin(socket) ⇒ Object
Checkin a socket used for reading.
-
#checkout ⇒ Object
Generic socket checkout Takes a block that returns a socket from pool.
- #checkout_reader(read_pref = {}) ⇒ Object
-
#checkout_writer ⇒ Object
Checkout a socket for writing (i.e., a primary node).
-
#close(opts = {}) ⇒ Object
Close the connection to the database.
-
#connect(force = !connected?)) ⇒ Object
Initiate a connection to the replica set.
- #connected? ⇒ Boolean
- #connecting? ⇒ Boolean deprecated Deprecated.
- #ensure_manager ⇒ Object
- #get_socket_from_pool(pool) ⇒ Object
-
#hard_refresh! ⇒ Boolean
Force a hard refresh of this connection’s view of the replica set.
-
#host ⇒ String
The replica set primary’s host name.
- #hosts ⇒ Object
-
#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient
constructor
Create a connection to a MongoDB replica set.
- #inspect ⇒ Object
- #local_manager ⇒ Object
- #logout_pools(db) ⇒ Object
- #max_bson_size ⇒ Object
- #max_message_size ⇒ Object
- #nodes ⇒ Object
- #pin_pool(pool, read_preference) ⇒ Object
- #pinned_pool ⇒ Object
- #pools ⇒ Object
-
#port ⇒ Integer
The replica set primary’s port.
- #primary ⇒ Object
- #primary_pool ⇒ Object
-
#read_primary? ⇒ Boolean
(also: #primary?)
Determine whether we’re reading from a primary node.
-
#refresh(opts = {}) ⇒ Boolean
Determine whether a replica set refresh is required.
- #reset_connection ⇒ Object deprecated Deprecated.
-
#secondaries ⇒ Object
Note: might want to freeze these after connecting.
- #secondary_pool ⇒ Object
- #secondary_pools ⇒ Object
-
#slave_ok? ⇒ Boolean
Returns
trueif it’s okay to read from a secondary node. - #tag_map ⇒ Object
- #unpin_pool ⇒ Object
- #valid_opts ⇒ Object
Methods included from ThreadLocalVariableManager
Methods included from ReadPreference
mongos, #read_pool, #read_preference, #select_near_pool, #select_pool, #select_secondary_pool, validate
Methods inherited from MongoClient
#[], #active?, #add_auth, #apply_saved_authentication, #check_is_master, #clear_auths, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #host_port, #lock!, #locked?, #mongos?, multi, #parse_init, #ping, #read_pool, #remove_auth, #server_info, #server_version, #unlock!
Methods included from WriteConcern
#get_write_concern, gle?, #write_concern_from_legacy
Methods included from Networking
#receive_message, #send_message, #send_message_with_gle
Methods included from Logging
#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message
Constructor Details
#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient
Create a connection to a MongoDB replica set.
If no args are provided, it will check ENV["MONGODB_URI"].
Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: MongoClient#primary, MongoClient#secondaries, and MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 116 def initialize(*args) opts = args.last.is_a?(Hash) ? args.pop : {} nodes = args.shift || [] raise MongoArgumentError, "Too many arguments" unless args.empty? # This is temporary until support for the old format is dropped @seeds = nodes.collect do |node| if node.is_a?(Array) warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated." warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0" node elsif node.is_a?(String) Support.normalize_seeds(node) else raise MongoArgumentError "Bad seed format!" end end if @seeds.empty? && ENV.has_key?('MONGODB_URI') parser = URIParser.new ENV['MONGODB_URI'] if parser.direct? raise MongoArgumentError, "ENV['MONGODB_URI'] implies a direct connection." end opts = parser..merge! opts @seeds = parser.nodes end if @seeds.length.zero? raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node." end @seeds.freeze # Refresh @last_refresh = Time.now @refresh_version = 0 # No connection manager by default. @manager = nil # Lock for request ids. @id_lock = Mutex.new @connected = false @connect_mutex = Mutex.new @mongos = false check_opts(opts) setup(opts.dup) end |
Instance Attribute Details
#manager ⇒ Object (readonly)
Returns the value of attribute manager.
30 31 32 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 30 def manager @manager end |
#refresh_interval ⇒ Object (readonly)
Returns the value of attribute refresh_interval.
30 31 32 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 30 def refresh_interval @refresh_interval end |
#refresh_mode ⇒ Object (readonly)
Returns the value of attribute refresh_mode.
30 31 32 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 30 def refresh_mode @refresh_mode end |
#refresh_version ⇒ Object (readonly)
Returns the value of attribute refresh_version.
30 31 32 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 30 def refresh_version @refresh_version end |
#replica_set_name ⇒ Object (readonly)
Returns the value of attribute replica_set_name.
30 31 32 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 30 def replica_set_name @replica_set_name end |
#seeds ⇒ Object (readonly)
Returns the value of attribute seeds.
30 31 32 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 30 def seeds @seeds end |
Instance Method Details
#arbiters ⇒ Object
413 414 415 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 413 def arbiters local_manager.arbiters.nil? ? [] : local_manager.arbiters end |
#authenticate_pools ⇒ Object
328 329 330 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 328 def authenticate_pools @manager.pools.each { |pool| pool.authenticate_existing } end |
#checkin(socket) ⇒ Object
Checkin a socket used for reading.
373 374 375 376 377 378 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 373 def checkin(socket) if socket && socket.pool socket.checkin end sync_refresh end |
#checkout ⇒ Object
Generic socket checkout Takes a block that returns a socket from pool
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 338 def checkout ensure_manager connected? ? sync_refresh : connect begin socket = yield rescue => ex checkin(socket) if socket raise ex end if socket return socket else @connected = false raise ConnectionFailure.new("Could not checkout a socket.") end end |
#checkout_reader(read_pref = {}) ⇒ Object
358 359 360 361 362 363 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 358 def checkout_reader(read_pref={}) checkout do pool = read_pool(read_pref) get_socket_from_pool(pool) end end |
#checkout_writer ⇒ Object
Checkout a socket for writing (i.e., a primary node).
366 367 368 369 370 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 366 def checkout_writer checkout do get_socket_from_pool(primary_pool) end end |
#close(opts = {}) ⇒ Object
Close the connection to the database.
295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 295 def close(opts={}) if opts[:soft] @manager.close(:soft => true) if @manager else @manager.close if @manager end # Clear the reference to this object. thread_local[:managers].delete(self) unpin_pool @connected = false end |
#connect(force = !connected?)) ⇒ Object
Initiate a connection to the replica set.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 181 def connect(force = !connected?) return unless force log(:info, "Connecting...") # Prevent recursive connection attempts from the same thread. # This is done rather than using a Monitor to prevent potentially recursing # infinitely while attempting to connect and continually failing. Instead, fail fast. raise ConnectionFailure, "Failed to get node data." if thread_local[:locks][:connecting] == true current_version = @refresh_version @connect_mutex.synchronize do # don't try to connect if another thread has done so while we were waiting for the lock return unless current_version == @refresh_version begin thread_local[:locks][:connecting] = true if @manager ensure_manager @manager.refresh!(@seeds) else @manager = PoolManager.new(self, @seeds) ensure_manager @manager.connect end ensure thread_local[:locks][:connecting] = false end @refresh_version += 1 if @manager.pools.empty? close raise ConnectionFailure, "Failed to connect to any node." else @connected = true end end end |
#connected? ⇒ Boolean
255 256 257 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 255 def connected? @connected && !@manager.pools.empty? end |
#connecting? ⇒ Boolean
260 261 262 263 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 260 def connecting? warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0." false end |
#ensure_manager ⇒ Object
380 381 382 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 380 def ensure_manager thread_local[:managers][self] = @manager end |
#get_socket_from_pool(pool) ⇒ Object
401 402 403 404 405 406 407 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 401 def get_socket_from_pool(pool) begin pool.checkout if pool rescue ConnectionFailure nil end end |
#hard_refresh! ⇒ Boolean
Force a hard refresh of this connection’s view of the replica set.
249 250 251 252 253 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 249 def hard_refresh! log(:info, "Initiating hard refresh...") connect(true) return true end |
#host ⇒ String
The replica set primary’s host name.
268 269 270 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 268 def host @manager.primary_pool.host end |
#hosts ⇒ Object
426 427 428 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 426 def hosts local_manager ? local_manager.hosts : [] end |
#inspect ⇒ Object
175 176 177 178 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 175 def inspect "<Mongo::MongoReplicaSetClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " + "@connected=#{@connected}>" end |
#local_manager ⇒ Object
409 410 411 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 409 def local_manager thread_local[:managers][self] end |
#logout_pools(db) ⇒ Object
332 333 334 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 332 def logout_pools(db) @manager.pools.each { |pool| pool.logout_existing(db) } end |
#max_bson_size ⇒ Object
450 451 452 453 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 450 def max_bson_size return local_manager.max_bson_size if local_manager DEFAULT_MAX_BSON_SIZE end |
#max_message_size ⇒ Object
455 456 457 458 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 455 def return local_manager. if local_manager max_bson_size * MESSAGE_SIZE_FACTOR end |
#nodes ⇒ Object
279 280 281 282 283 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 279 def nodes warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " + "Please use MongoReplicaSetClient#seeds instead." @seeds end |
#pin_pool(pool, read_preference) ⇒ Object
388 389 390 391 392 393 394 395 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 388 def pin_pool(pool, read_preference) if @manager thread_local[:pinned_pools][@manager.object_id] = { :pool => pool, :read_preference => read_preference } end end |
#pinned_pool ⇒ Object
384 385 386 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 384 def pinned_pool thread_local[:pinned_pools][@manager.object_id] if @manager end |
#pools ⇒ Object
442 443 444 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 442 def pools local_manager ? local_manager.pools : [] end |
#port ⇒ Integer
The replica set primary’s port.
275 276 277 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 275 def port @manager.primary_pool.port end |
#primary ⇒ Object
417 418 419 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 417 def primary local_manager ? local_manager.primary : nil end |
#primary_pool ⇒ Object
430 431 432 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 430 def primary_pool local_manager ? local_manager.primary_pool : nil end |
#read_primary? ⇒ Boolean Also known as: primary?
Determine whether we’re reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.
289 290 291 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 289 def read_primary? read_pool == primary_pool end |
#refresh(opts = {}) ⇒ Boolean
Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running MongoReplicaSetClient#hard_refresh!
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 225 def refresh(opts={}) if !connected? log(:info, "Trying to check replica set health but not " + "connected...") return hard_refresh! end log(:debug, "Checking replica set connection health...") ensure_manager @manager.check_connection_health if @manager.refresh_required? return hard_refresh! end return true end |
#reset_connection ⇒ Object
If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values.
312 313 314 315 316 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 312 def reset_connection close warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " + "Use MongoReplicaSetClient#close instead." end |
#secondaries ⇒ Object
Note: might want to freeze these after connecting.
422 423 424 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 422 def secondaries local_manager ? local_manager.secondaries : [] end |
#secondary_pool ⇒ Object
434 435 436 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 434 def secondary_pool local_manager ? local_manager.secondary_pool : nil end |
#secondary_pools ⇒ Object
438 439 440 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 438 def secondary_pools local_manager ? local_manager.secondary_pools : [] end |
#slave_ok? ⇒ Boolean
Returns true if it’s okay to read from a secondary node.
This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.
324 325 326 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 324 def slave_ok? @read != :primary end |
#tag_map ⇒ Object
446 447 448 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 446 def tag_map local_manager ? local_manager.tag_map : {} end |
#unpin_pool ⇒ Object
397 398 399 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 397 def unpin_pool thread_local[:pinned_pools].delete @manager.object_id if @manager end |
#valid_opts ⇒ Object
171 172 173 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 171 def valid_opts super + REPL_SET_OPTS - CLIENT_ONLY_OPTS end |