Class: Mongo::MongoReplicaSetClient

Inherits:
MongoClient show all
Includes:
ThreadLocalVariableManager
Defined in:
lib/mongo/mongo_replica_set_client.rb

Overview

Instantiates and manages connections to a MongoDB replica set.

Direct Known Subclasses

MongoShardedClient, ReplSetConnection

Constant Summary collapse

REPL_SET_OPTS =
[
  :refresh_mode,
  :refresh_interval,
  :read_secondary,
  :rs_name,
  :name
]

Constants inherited from MongoClient

Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::READ_PREFERENCE_OPTS, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS

Constants included from Networking

Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Instance Attribute Summary collapse

Attributes inherited from MongoClient

#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #read, #size, #socket_class, #tag_sets, #write_concern

Attributes included from WriteConcern

#legacy_write_concern

Instance Method Summary collapse

Methods included from ThreadLocalVariableManager

#thread_local

Methods inherited from MongoClient

#[], #active?, #add_auth, #apply_saved_authentication, #clear_auths, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #host_port, #lock!, #locked?, #mongos?, multi, #parse_init, #ping, #primary?, #reconnect, #remove_auth, #server_info, #server_version, #unlock!

Methods included from WriteConcern

#get_write_concern, gle?, #write_concern_from_legacy

Methods included from Networking

#receive_message, #send_message, #send_message_with_gle

Methods included from Logging

#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message

Constructor Details

#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient

Create a connection to a MongoDB replica set.

If no args are provided, it will check ENV["MONGODB_URI"].

Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: MongoClient#primary, MongoClient#secondaries, and MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.

Examples:

Connect to a replica set and provide two seed nodes.

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'])

Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named ‘prod’:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :name => 'prod')

Connect to a replica set providing two seed nodes and allowing reads from a secondary node:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :read => :secondary)
Note:

the number of seed nodes does not have to be equal to the number of replica set members. The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.

Parameters:

  • seeds (Array<String>, Array<Array(String, Integer)>) (defaults to: ENV["MONGODB_URI"])

Options Hash (opts):

  • :w (String, Integer, Symbol) — default: 1

    Set default number of nodes to which a write should be acknowledged

  • :j (Boolean) — default: false

    Set journal acknowledgement

  • :wtimeout (Integer) — default: nil

    Set acknowledgement timeout

  • :fsync (Boolean) — default: false

    Set fsync acknowledgement.

    Notes about write concern options:

    Write concern options are propagated to objects instantiated from this MongoReplicaSetClient.
    These defaults can be overridden upon instantiation of any object by explicitly setting an options hash
    on initialization.
    
  • :read (:primary, :primary_preferred, :secondary, :secondary_preferred, :nearest) — default: :primary

    A “read preference” determines the candidate replica set members to which a query or command can be sent.

    :primary
    • Read from primary only.

    • Cannot be combined with tags.

    :primary_preferred
    • Read from primary if available, otherwise read from a secondary.

    :secondary
    • Read from secondary if available.

    :secondary_preferred
    • Read from a secondary if available, otherwise read from the primary.

    :nearest
    • Read from any member.

  • :tag_sets (Array<Hash{ String, Symbol => Tag Value }>) — default: []

    Read from replica-set members with these tags.

  • :secondary_acceptable_latency_ms (Integer) — default: 15

    The acceptable nearest available member for a member to be considered “near”.

  • :logger (Logger) — default: nil

    Logger instance to receive driver operation log.

  • :pool_size (Integer) — default: 1

    The maximum number of socket connections allowed per connection pool. Note: this setting is relevant only for multi-threaded applications.

  • :pool_timeout (Float) — default: 5.0

    When all of the connections a pool are checked out, this is the number of seconds to wait for a new connection to be released before throwing an exception. Note: this setting is relevant only for multi-threaded applications.

  • :op_timeout (Float) — default: nil

    The number of seconds to wait for a read operation to time out.

  • :connect_timeout (Float) — default: 30

    The number of seconds to wait before timing out a connection attempt.

  • :ssl (Boolean) — default: false

    If true, create the connection to the server using SSL.

  • :refresh_mode (Boolean) — default: false

    Set this to :sync to periodically update the state of the connection every :refresh_interval seconds. Replica set connection failures will always trigger a complete refresh. This option is useful when you want to add new nodes or remove replica set nodes not currently in use by the driver.

  • :refresh_interval (Integer) — default: 90

    If :refresh_mode is enabled, this is the number of seconds between calls to check the replica set’s state.

Raises:

See Also:



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/mongo/mongo_replica_set_client.rb', line 112

def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}
  nodes = args.shift || []

  raise MongoArgumentError, "Too many arguments" unless args.empty?

  # This is temporary until support for the old format is dropped
  @seeds = nodes.collect do |node|
    if node.is_a?(Array)
      warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated."
      warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
      node
    elsif node.is_a?(String)
      host, port = node.split(":")
      [ host, port.to_i ]
    else
      raise MongoArgumentError "Bad seed format!"
    end
  end

  if @seeds.empty? && ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    if parser.direct?
      raise MongoArgumentError,
        "ENV['MONGODB_URI'] implies a direct connection."
    end
    opts = parser.connection_options.merge! opts
    @seeds = parser.nodes
  end

  if @seeds.length.zero?
    raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node."
  end

  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil
  @old_managers = []

  # Lock for request ids.
  @id_lock = Mutex.new

  @pool_mutex = Mutex.new
  @connected = false

  @safe_mutex_lock = Mutex.new
  @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

  @connect_mutex = Mutex.new
  @refresh_mutex = Mutex.new

  @mongos = false

  check_opts(opts)
  setup(opts.dup)
end

Instance Attribute Details

#managerObject (readonly)

Returns the value of attribute manager.



33
34
35
# File 'lib/mongo/mongo_replica_set_client.rb', line 33

def manager
  @manager
end

#refresh_intervalObject (readonly)

Returns the value of attribute refresh_interval.



33
34
35
# File 'lib/mongo/mongo_replica_set_client.rb', line 33

def refresh_interval
  @refresh_interval
end

#refresh_modeObject (readonly)

Returns the value of attribute refresh_mode.



33
34
35
# File 'lib/mongo/mongo_replica_set_client.rb', line 33

def refresh_mode
  @refresh_mode
end

#refresh_versionObject (readonly)

Returns the value of attribute refresh_version.



33
34
35
# File 'lib/mongo/mongo_replica_set_client.rb', line 33

def refresh_version
  @refresh_version
end

#replica_set_nameObject (readonly)

Returns the value of attribute replica_set_name.



33
34
35
# File 'lib/mongo/mongo_replica_set_client.rb', line 33

def replica_set_name
  @replica_set_name
end

#seedsObject (readonly)

Returns the value of attribute seeds.



33
34
35
# File 'lib/mongo/mongo_replica_set_client.rb', line 33

def seeds
  @seeds
end

Instance Method Details

#arbitersObject



400
401
402
# File 'lib/mongo/mongo_replica_set_client.rb', line 400

def arbiters
  local_manager.arbiters.nil? ? [] : local_manager.arbiters
end

#authenticate_poolsObject



323
324
325
# File 'lib/mongo/mongo_replica_set_client.rb', line 323

def authenticate_pools
  @manager.pools.each { |pool| pool.authenticate_existing }
end

#checkin(socket) ⇒ Object

Checkin a socket used for reading.



369
370
371
372
373
374
# File 'lib/mongo/mongo_replica_set_client.rb', line 369

def checkin(socket)
  if socket && socket.pool
    socket.pool.checkin(socket)
  end
  sync_refresh
end

#checkoutObject

Generic socket checkout Takes a block that returns a socket from pool



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/mongo/mongo_replica_set_client.rb', line 333

def checkout
  ensure_manager

  connected? ? sync_refresh : connect

  begin
    socket = yield
  rescue => ex
    checkin(socket) if socket
    raise ex
  end

  if socket
    socket
  else
    @connected = false
    raise ConnectionFailure.new("Could not checkout a socket.")
  end
  socket
end

#checkout_reader(mode = @read, tag_sets = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object



354
355
356
357
358
359
# File 'lib/mongo/mongo_replica_set_client.rb', line 354

def checkout_reader(mode=@read, tag_sets=@tag_sets, acceptable_latency=@acceptable_latency)
  checkout do
    pool = read_pool(mode, tag_sets, acceptable_latency)
    get_socket_from_pool(pool)
  end
end

#checkout_writerObject

Checkout a socket for writing (i.e., a primary node).



362
363
364
365
366
# File 'lib/mongo/mongo_replica_set_client.rb', line 362

def checkout_writer
  checkout do
    get_socket_from_pool(primary_pool)
  end
end

#close(opts = {}) ⇒ Object

Close the connection to the database.



291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/mongo/mongo_replica_set_client.rb', line 291

def close(opts={})
  if opts[:soft]
    @manager.close(:soft => true) if @manager
  else
    @manager.close if @manager
  end

  # Clear the reference to this object.
  thread_local[:managers].delete(self)

  @connected = false
end

#connectObject

Initiate a connection to the replica set.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/mongo/mongo_replica_set_client.rb', line 184

def connect
  log(:info, "Connecting...")
  @connect_mutex.synchronize do
    return if @connected

    seeds = @manager.nil? ? @seeds : @manager.seeds
    @manager = PoolManager.new(self, seeds)

    thread_local[:managers][self] = @manager

    @manager.connect
    @refresh_version += 1

    if @manager.pools.empty?
      close
      raise ConnectionFailure, "Failed to connect to any node."
    else
      @connected = true
    end
  end
end

#connected?Boolean

Returns:

  • (Boolean)


252
253
254
# File 'lib/mongo/mongo_replica_set_client.rb', line 252

def connected?
  @connected && !@manager.pools.empty?
end

#connecting?Boolean

Deprecated.

Returns:

  • (Boolean)


257
258
259
260
# File 'lib/mongo/mongo_replica_set_client.rb', line 257

def connecting?
  warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0."
  false
end

#ensure_managerObject



376
377
378
# File 'lib/mongo/mongo_replica_set_client.rb', line 376

def ensure_manager
  thread_local[:managers][self] = @manager
end

#get_socket_from_pool(pool) ⇒ Object



388
389
390
391
392
393
394
# File 'lib/mongo/mongo_replica_set_client.rb', line 388

def get_socket_from_pool(pool)
  begin
    pool.checkout if pool
  rescue ConnectionFailure
    nil
  end
end

#hard_refresh!Boolean

Force a hard refresh of this connection’s view of the replica set.

Returns:

  • (Boolean)

    true if hard refresh occurred. false is returned when unable to get the refresh lock.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/mongo/mongo_replica_set_client.rb', line 236

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  discovered_seeds = @manager.seeds
  new_manager = PoolManager.new(self, discovered_seeds | @seeds)
  new_manager.connect

  thread_local[:managers][self] = new_manager

  # TODO: make sure that connect has succeeded
  @old_managers << @manager
  @manager = new_manager

  @refresh_version += 1
  return true
end

#hostString

The replica set primary’s host name.

Returns:



265
266
267
# File 'lib/mongo/mongo_replica_set_client.rb', line 265

def host
  @manager.primary_pool.host
end

#hostsObject



413
414
415
# File 'lib/mongo/mongo_replica_set_client.rb', line 413

def hosts
  local_manager ? local_manager.hosts : []
end

#inspectObject



178
179
180
181
# File 'lib/mongo/mongo_replica_set_client.rb', line 178

def inspect
  "<Mongo::MongoReplicaSetClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
    "@connected=#{@connected}>"
end

#local_managerObject



396
397
398
# File 'lib/mongo/mongo_replica_set_client.rb', line 396

def local_manager
  thread_local[:managers][self]
end

#logout_pools(db) ⇒ Object



327
328
329
# File 'lib/mongo/mongo_replica_set_client.rb', line 327

def logout_pools(db)
  @manager.pools.each { |pool| pool.logout_existing(db) }
end

#max_bson_sizeObject



437
438
439
440
441
442
443
# File 'lib/mongo/mongo_replica_set_client.rb', line 437

def max_bson_size
  if local_manager && local_manager.max_bson_size
    local_manager.max_bson_size
  else
    Mongo::DEFAULT_MAX_BSON_SIZE
  end
end

#nodesObject



276
277
278
279
280
# File 'lib/mongo/mongo_replica_set_client.rb', line 276

def nodes
  warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " +
    "Please use MongoReplicaSetClient#seeds instead."
  @seeds
end

#pin_pool(pool) ⇒ Object



380
381
382
# File 'lib/mongo/mongo_replica_set_client.rb', line 380

def pin_pool(pool)
  thread_local[:pinned_pools][@manager.object_id] = pool if @manager
end

#portInteger

The replica set primary’s port.

Returns:

  • (Integer)


272
273
274
# File 'lib/mongo/mongo_replica_set_client.rb', line 272

def port
  @manager.primary_pool.port
end

#primaryObject



404
405
406
# File 'lib/mongo/mongo_replica_set_client.rb', line 404

def primary
  local_manager ? local_manager.primary : nil
end

#primary_poolObject



417
418
419
# File 'lib/mongo/mongo_replica_set_client.rb', line 417

def primary_pool
  local_manager ? local_manager.primary_pool : nil
end

#read_pool(mode = @read, tags = @tag_sets, acceptable_latency = @acceptable_latency) ⇒ Object



421
422
423
# File 'lib/mongo/mongo_replica_set_client.rb', line 421

def read_pool(mode=@read, tags=@tag_sets, acceptable_latency=@acceptable_latency)
  local_manager ? local_manager.read_pool(mode, tags, acceptable_latency) : nil
end

#read_primary?Boolean

Determine whether we’re reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.

Returns:

  • (Boolean)


286
287
288
# File 'lib/mongo/mongo_replica_set_client.rb', line 286

def read_primary?
  @manager.read_pool == @manager.primary_pool
end

#refresh(opts = {}) ⇒ Boolean

Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running MongoReplicaSetClient#hard_refresh!

Returns:

  • (Boolean)

    true unless a hard refresh is run and the refresh lock can’t be acquired.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/mongo/mongo_replica_set_client.rb', line 213

def refresh(opts={})
  if !connected?
    log(:info, "Trying to check replica set health but not " +
      "connected...")
    return hard_refresh!
  end

  log(:debug, "Checking replica set connection health...")
  @manager.check_connection_health

  if @manager.refresh_required?
    return hard_refresh!
  end

  return true
end

#reset_connectionObject

Deprecated.

If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values.



307
308
309
310
311
# File 'lib/mongo/mongo_replica_set_client.rb', line 307

def reset_connection
  close
  warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " +
    "Use MongoReplicaSetClient#close instead."
end

#secondariesObject

Note: might want to freeze these after connecting.



409
410
411
# File 'lib/mongo/mongo_replica_set_client.rb', line 409

def secondaries
  local_manager ? local_manager.secondaries : []
end

#secondary_poolObject



425
426
427
# File 'lib/mongo/mongo_replica_set_client.rb', line 425

def secondary_pool
  local_manager ? local_manager.secondary_pool : nil
end

#secondary_poolsObject



429
430
431
# File 'lib/mongo/mongo_replica_set_client.rb', line 429

def secondary_pools
  local_manager ? local_manager.secondary_pools : []
end

#slave_ok?Boolean

Returns true if it’s okay to read from a secondary node.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

Returns:

  • (Boolean)

    true



319
320
321
# File 'lib/mongo/mongo_replica_set_client.rb', line 319

def slave_ok?
  @read != :primary
end

#tag_mapObject



433
434
435
# File 'lib/mongo/mongo_replica_set_client.rb', line 433

def tag_map
  local_manager ? local_manager.tag_map : {}
end

#unpin_pool(pool) ⇒ Object



384
385
386
# File 'lib/mongo/mongo_replica_set_client.rb', line 384

def unpin_pool(pool)
  thread_local[:pinned_pools].delete @manager.object_id if @manager
end

#valid_optsObject



174
175
176
# File 'lib/mongo/mongo_replica_set_client.rb', line 174

def valid_opts
  super + REPL_SET_OPTS - CLIENT_ONLY_OPTS
end