Class: Mongo::MongoReplicaSetClient
- Inherits:
-
MongoClient
- Object
- MongoClient
- Mongo::MongoReplicaSetClient
- Includes:
- ThreadLocalVariableManager
- Defined in:
- lib/mongo/mongo_replica_set_client.rb
Overview
Instantiates and manages connections to a MongoDB replica set.
Direct Known Subclasses
Constant Summary collapse
- REPL_SET_OPTS =
[ :refresh_mode, :refresh_interval, :read_secondary, :rs_name, :name ]
Constants inherited from MongoClient
Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::READ_PREFERENCE_OPTS, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS
Constants included from Networking
Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE
Instance Attribute Summary collapse
-
#manager ⇒ Object
readonly
Returns the value of attribute manager.
-
#refresh_interval ⇒ Object
readonly
Returns the value of attribute refresh_interval.
-
#refresh_mode ⇒ Object
readonly
Returns the value of attribute refresh_mode.
-
#refresh_version ⇒ Object
readonly
Returns the value of attribute refresh_version.
-
#replica_set_name ⇒ Object
readonly
Returns the value of attribute replica_set_name.
-
#seeds ⇒ Object
readonly
Returns the value of attribute seeds.
Attributes inherited from MongoClient
#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #read, #size, #socket_class, #tag_sets, #write_concern
Attributes included from WriteConcern
Instance Method Summary collapse
- #arbiters ⇒ Object
- #authenticate_pools ⇒ Object
-
#checkin(socket) ⇒ Object
Checkin a socket used for reading.
-
#checkout ⇒ Object
Generic socket checkout Takes a block that returns a socket from pool.
- #checkout_reader(mode = @read, tag_sets = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object
-
#checkout_writer ⇒ Object
Checkout a socket for writing (i.e., a primary node).
-
#close(opts = {}) ⇒ Object
Close the connection to the database.
-
#connect ⇒ Object
Initiate a connection to the replica set.
- #connected? ⇒ Boolean
- #connecting? ⇒ Boolean deprecated Deprecated.
- #ensure_manager ⇒ Object
- #get_socket_from_pool(pool) ⇒ Object
-
#hard_refresh! ⇒ Boolean
Force a hard refresh of this connection’s view of the replica set.
-
#host ⇒ String
The replica set primary’s host name.
- #hosts ⇒ Object
-
#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient
constructor
Create a connection to a MongoDB replica set.
- #inspect ⇒ Object
- #local_manager ⇒ Object
- #logout_pools(db) ⇒ Object
- #max_bson_size ⇒ Object
- #nodes ⇒ Object
- #pin_pool(pool) ⇒ Object
-
#port ⇒ Integer
The replica set primary’s port.
- #primary ⇒ Object
- #primary_pool ⇒ Object
- #read_pool(mode = @read, tags = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object
-
#read_primary? ⇒ Boolean
Determine whether we’re reading from a primary node.
-
#refresh(opts = {}) ⇒ Boolean
Determine whether a replica set refresh is required.
- #reset_connection ⇒ Object deprecated Deprecated.
-
#secondaries ⇒ Object
Note: might want to freeze these after connecting.
- #secondary_pool ⇒ Object
- #secondary_pools ⇒ Object
-
#slave_ok? ⇒ Boolean
Returns
trueif it’s okay to read from a secondary node. - #tag_map ⇒ Object
- #unpin_pool(pool) ⇒ Object
- #valid_opts ⇒ Object
Methods included from ThreadLocalVariableManager
Methods inherited from MongoClient
#[], #active?, #add_auth, #apply_saved_authentication, #clear_auths, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #host_port, #lock!, #locked?, #mongos?, multi, #parse_init, #ping, #primary?, #reconnect, #remove_auth, #server_info, #server_version, #unlock!
Methods included from WriteConcern
#get_write_concern, gle?, #write_concern_from_legacy
Methods included from Networking
#receive_message, #send_message, #send_message_with_gle
Methods included from Logging
#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message
Constructor Details
#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient
Create a connection to a MongoDB replica set.
If no args are provided, it will check ENV["MONGODB_URI"].
Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: MongoClient#primary, MongoClient#secondaries, and MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 112 def initialize(*args) opts = args.last.is_a?(Hash) ? args.pop : {} nodes = args.shift || [] raise MongoArgumentError, "Too many arguments" unless args.empty? # This is temporary until support for the old format is dropped @seeds = nodes.collect do |node| if node.is_a?(Array) warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated." warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0" node elsif node.is_a?(String) host, port = node.split(":") [ host, port.to_i ] else raise MongoArgumentError "Bad seed format!" end end if @seeds.empty? && ENV.has_key?('MONGODB_URI') parser = URIParser.new ENV['MONGODB_URI'] if parser.direct? raise MongoArgumentError, "ENV['MONGODB_URI'] implies a direct connection." end opts = parser..merge! opts @seeds = parser.nodes end if @seeds.length.zero? raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node." end @seeds.freeze # Refresh @last_refresh = Time.now @refresh_version = 0 # No connection manager by default. @manager = nil @old_managers = [] # Lock for request ids. @id_lock = Mutex.new @pool_mutex = Mutex.new @connected = false @safe_mutex_lock = Mutex.new @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new} @connect_mutex = Mutex.new @refresh_mutex = Mutex.new @mongos = false check_opts(opts) setup(opts.dup) end |
Instance Attribute Details
#manager ⇒ Object (readonly)
Returns the value of attribute manager.
33 34 35 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 33 def manager @manager end |
#refresh_interval ⇒ Object (readonly)
Returns the value of attribute refresh_interval.
33 34 35 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 33 def refresh_interval @refresh_interval end |
#refresh_mode ⇒ Object (readonly)
Returns the value of attribute refresh_mode.
33 34 35 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 33 def refresh_mode @refresh_mode end |
#refresh_version ⇒ Object (readonly)
Returns the value of attribute refresh_version.
33 34 35 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 33 def refresh_version @refresh_version end |
#replica_set_name ⇒ Object (readonly)
Returns the value of attribute replica_set_name.
33 34 35 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 33 def replica_set_name @replica_set_name end |
#seeds ⇒ Object (readonly)
Returns the value of attribute seeds.
33 34 35 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 33 def seeds @seeds end |
Instance Method Details
#arbiters ⇒ Object
400 401 402 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 400 def arbiters local_manager.arbiters.nil? ? [] : local_manager.arbiters end |
#authenticate_pools ⇒ Object
323 324 325 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 323 def authenticate_pools @manager.pools.each { |pool| pool.authenticate_existing } end |
#checkin(socket) ⇒ Object
Checkin a socket used for reading.
369 370 371 372 373 374 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 369 def checkin(socket) if socket && socket.pool socket.pool.checkin(socket) end sync_refresh end |
#checkout ⇒ Object
Generic socket checkout Takes a block that returns a socket from pool
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 333 def checkout ensure_manager connected? ? sync_refresh : connect begin socket = yield rescue => ex checkin(socket) if socket raise ex end if socket socket else @connected = false raise ConnectionFailure.new("Could not checkout a socket.") end socket end |
#checkout_reader(mode = @read, tag_sets = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object
354 355 356 357 358 359 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 354 def checkout_reader(mode=@read, tag_sets=@tag_sets, acceptable_latency=@acceptable_latency) checkout do pool = read_pool(mode, tag_sets, acceptable_latency) get_socket_from_pool(pool) end end |
#checkout_writer ⇒ Object
Checkout a socket for writing (i.e., a primary node).
362 363 364 365 366 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 362 def checkout_writer checkout do get_socket_from_pool(primary_pool) end end |
#close(opts = {}) ⇒ Object
Close the connection to the database.
291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 291 def close(opts={}) if opts[:soft] @manager.close(:soft => true) if @manager else @manager.close if @manager end # Clear the reference to this object. thread_local[:managers].delete(self) @connected = false end |
#connect ⇒ Object
Initiate a connection to the replica set.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 184 def connect log(:info, "Connecting...") @connect_mutex.synchronize do return if @connected seeds = @manager.nil? ? @seeds : @manager.seeds @manager = PoolManager.new(self, seeds) thread_local[:managers][self] = @manager @manager.connect @refresh_version += 1 if @manager.pools.empty? close raise ConnectionFailure, "Failed to connect to any node." else @connected = true end end end |
#connected? ⇒ Boolean
252 253 254 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 252 def connected? @connected && !@manager.pools.empty? end |
#connecting? ⇒ Boolean
257 258 259 260 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 257 def connecting? warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0." false end |
#ensure_manager ⇒ Object
376 377 378 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 376 def ensure_manager thread_local[:managers][self] = @manager end |
#get_socket_from_pool(pool) ⇒ Object
388 389 390 391 392 393 394 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 388 def get_socket_from_pool(pool) begin pool.checkout if pool rescue ConnectionFailure nil end end |
#hard_refresh! ⇒ Boolean
Force a hard refresh of this connection’s view of the replica set.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 236 def hard_refresh! log(:info, "Initiating hard refresh...") discovered_seeds = @manager.seeds new_manager = PoolManager.new(self, discovered_seeds | @seeds) new_manager.connect thread_local[:managers][self] = new_manager # TODO: make sure that connect has succeeded @old_managers << @manager @manager = new_manager @refresh_version += 1 return true end |
#host ⇒ String
The replica set primary’s host name.
265 266 267 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 265 def host @manager.primary_pool.host end |
#hosts ⇒ Object
413 414 415 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 413 def hosts local_manager ? local_manager.hosts : [] end |
#inspect ⇒ Object
178 179 180 181 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 178 def inspect "<Mongo::MongoReplicaSetClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " + "@connected=#{@connected}>" end |
#local_manager ⇒ Object
396 397 398 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 396 def local_manager thread_local[:managers][self] end |
#logout_pools(db) ⇒ Object
327 328 329 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 327 def logout_pools(db) @manager.pools.each { |pool| pool.logout_existing(db) } end |
#max_bson_size ⇒ Object
437 438 439 440 441 442 443 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 437 def max_bson_size if local_manager && local_manager.max_bson_size local_manager.max_bson_size else Mongo::DEFAULT_MAX_BSON_SIZE end end |
#nodes ⇒ Object
276 277 278 279 280 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 276 def nodes warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " + "Please use MongoReplicaSetClient#seeds instead." @seeds end |
#pin_pool(pool) ⇒ Object
380 381 382 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 380 def pin_pool(pool) thread_local[:pinned_pools][@manager.object_id] = pool if @manager end |
#port ⇒ Integer
The replica set primary’s port.
272 273 274 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 272 def port @manager.primary_pool.port end |
#primary ⇒ Object
404 405 406 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 404 def primary local_manager ? local_manager.primary : nil end |
#primary_pool ⇒ Object
417 418 419 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 417 def primary_pool local_manager ? local_manager.primary_pool : nil end |
#read_pool(mode = @read, tags = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object
421 422 423 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 421 def read_pool(mode=@read, =@tag_sets, acceptable_latency=@acceptable_latency) local_manager ? local_manager.read_pool(mode, , acceptable_latency) : nil end |
#read_primary? ⇒ Boolean
Determine whether we’re reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.
286 287 288 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 286 def read_primary? @manager.read_pool == @manager.primary_pool end |
#refresh(opts = {}) ⇒ Boolean
Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running MongoReplicaSetClient#hard_refresh!
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 213 def refresh(opts={}) if !connected? log(:info, "Trying to check replica set health but not " + "connected...") return hard_refresh! end log(:debug, "Checking replica set connection health...") @manager.check_connection_health if @manager.refresh_required? return hard_refresh! end return true end |
#reset_connection ⇒ Object
If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values.
307 308 309 310 311 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 307 def reset_connection close warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " + "Use MongoReplicaSetClient#close instead." end |
#secondaries ⇒ Object
Note: might want to freeze these after connecting.
409 410 411 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 409 def secondaries local_manager ? local_manager.secondaries : [] end |
#secondary_pool ⇒ Object
425 426 427 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 425 def secondary_pool local_manager ? local_manager.secondary_pool : nil end |
#secondary_pools ⇒ Object
429 430 431 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 429 def secondary_pools local_manager ? local_manager.secondary_pools : [] end |
#slave_ok? ⇒ Boolean
Returns true if it’s okay to read from a secondary node.
This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.
319 320 321 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 319 def slave_ok? @read != :primary end |
#tag_map ⇒ Object
433 434 435 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 433 def tag_map local_manager ? local_manager.tag_map : {} end |
#unpin_pool(pool) ⇒ Object
384 385 386 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 384 def unpin_pool(pool) thread_local[:pinned_pools].delete @manager.object_id if @manager end |
#valid_opts ⇒ Object
174 175 176 |
# File 'lib/mongo/mongo_replica_set_client.rb', line 174 def valid_opts super + REPL_SET_OPTS - CLIENT_ONLY_OPTS end |