Class: Mongo::MongoReplicaSetClient

Inherits:
MongoClient show all
Includes:
ReadPreference, ThreadLocalVariableManager
Defined in:
lib/mongo/mongo_replica_set_client.rb

Overview

Instantiates and manages connections to a MongoDB replica set.

Direct Known Subclasses

MongoShardedClient, ReplSetConnection

Constant Summary collapse

REPL_SET_OPTS =
[
  :refresh_mode,
  :refresh_interval,
  :read_secondary,
  :rs_name,
  :name
]

Constants included from ReadPreference

ReadPreference::MONGOS_MODES, ReadPreference::READ_PREFERENCES

Constants inherited from MongoClient

Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::READ_PREFERENCE_OPTS, Mongo::MongoClient::SSL_OPTS, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS

Constants included from Networking

Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Instance Attribute Summary collapse

Attributes inherited from MongoClient

#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #read, #size, #socket_class, #socket_opts, #tag_sets, #write_concern

Attributes included from WriteConcern

#legacy_write_concern

Instance Method Summary collapse

Methods included from ThreadLocalVariableManager

#thread_local

Methods included from ReadPreference

mongos, #read_pool, #read_preference, #select_near_pool, #select_pool, #select_secondary_pool, validate

Methods inherited from MongoClient

#[], #active?, #add_auth, #apply_saved_authentication, #check_is_master, #clear_auths, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #host_port, #lock!, #locked?, #mongos?, multi, #parse_init, #ping, #read_pool, #remove_auth, #server_info, #server_version, #unlock!

Methods included from WriteConcern

#get_write_concern, gle?, #write_concern_from_legacy

Methods included from Networking

#receive_message, #send_message, #send_message_with_gle

Methods included from Logging

#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message

Constructor Details

#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient

Create a connection to a MongoDB replica set.

If no args are provided, it will check ENV["MONGODB_URI"].

Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: MongoClient#primary, MongoClient#secondaries, and MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.

Examples:

Connect to a replica set and provide two seed nodes.

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'])

Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named ‘prod’:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :name => 'prod')

Connect to a replica set providing two seed nodes and allowing reads from a secondary node:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :read => :secondary)
Note:

the number of seed nodes does not have to be equal to the number of replica set members. The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.

Parameters:

  • seeds (Array<String>, Array<Array(String, Integer)>) (defaults to: ENV["MONGODB_URI"])

Options Hash (opts):

  • :w (String, Integer, Symbol) — default: 1

    Set default number of nodes to which a write should be acknowledged

  • :j (Boolean) — default: false

    Set journal acknowledgement

  • :wtimeout (Integer) — default: nil

    Set acknowledgement timeout

  • :fsync (Boolean) — default: false

    Set fsync acknowledgement.

    Notes about write concern options:

    Write concern options are propagated to objects instantiated from this MongoReplicaSetClient.
    These defaults can be overridden upon instantiation of any object by explicitly setting an options hash
    on initialization.
    
  • :read (:primary, :primary_preferred, :secondary, :secondary_preferred, :nearest) — default: :primary

    A “read preference” determines the candidate replica set members to which a query or command can be sent.

    :primary
    • Read from primary only.

    • Cannot be combined with tags.

    :primary_preferred
    • Read from primary if available, otherwise read from a secondary.

    :secondary
    • Read from secondary if available.

    :secondary_preferred
    • Read from a secondary if available, otherwise read from the primary.

    :nearest
    • Read from any member.

  • :tag_sets (Array<Hash{ String, Symbol => Tag Value }>) — default: []

    Read from replica-set members with these tags.

  • :secondary_acceptable_latency_ms (Integer) — default: 15

    The acceptable nearest available member for a member to be considered “near”.

  • :logger (Logger) — default: nil

    Logger instance to receive driver operation log.

  • :pool_size (Integer) — default: 1

    The maximum number of socket connections allowed per connection pool. Note: this setting is relevant only for multi-threaded applications.

  • :pool_timeout (Float) — default: 5.0

    When all of the connections a pool are checked out, this is the number of seconds to wait for a new connection to be released before throwing an exception. Note: this setting is relevant only for multi-threaded applications.

  • :op_timeout (Float) — default: nil

    The number of seconds to wait for a read operation to time out.

  • :connect_timeout (Float) — default: 30

    The number of seconds to wait before timing out a connection attempt.

  • :ssl (Boolean) — default: false

    If true, create the connection to the server using SSL.

  • :ssl_cert (String) — default: nil

    The certificate file used to identify the local connection against MongoDB.

  • :ssl_key (String) — default: nil

    The private keyfile used to identify the local connection against MongoDB. If included with the :ssl_cert then only :ssl_cert is needed.

  • :ssl_verify (Boolean) — default: nil

    Specifies whether or not peer certification validation should occur.

  • :ssl_ca_cert (String) — default: nil

    The ca_certs file contains a set of concatenated “certification authority” certificates, which are used to validate certificates passed from the other end of the connection. Required for :ssl_verify.

  • :refresh_mode (Boolean) — default: false

    Set this to :sync to periodically update the state of the connection every :refresh_interval seconds. Replica set connection failures will always trigger a complete refresh. This option is useful when you want to add new nodes or remove replica set nodes not currently in use by the driver.

  • :refresh_interval (Integer) — default: 90

    If :refresh_mode is enabled, this is the number of seconds between calls to check the replica set’s state.

Raises:

See Also:



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/mongo/mongo_replica_set_client.rb', line 116

def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}
  nodes = args.shift || []

  raise MongoArgumentError, "Too many arguments" unless args.empty?

  # This is temporary until support for the old format is dropped
  @seeds = nodes.collect do |node|
    if node.is_a?(Array)
      warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated."
      warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
      node
    elsif node.is_a?(String)
      Support.normalize_seeds(node)
    else
      raise MongoArgumentError "Bad seed format!"
    end
  end

  if @seeds.empty? && ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    if parser.direct?
      raise MongoArgumentError,
        "ENV['MONGODB_URI'] implies a direct connection."
    end
    opts = parser.connection_options.merge! opts
    @seeds = parser.nodes
  end

  if @seeds.length.zero?
    raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node."
  end

  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil

  # Lock for request ids.
  @id_lock = Mutex.new

  @connected = false

  @connect_mutex = Mutex.new

  @mongos = false

  check_opts(opts)
  setup(opts.dup)
end

Instance Attribute Details

#managerObject (readonly)

Returns the value of attribute manager.



30
31
32
# File 'lib/mongo/mongo_replica_set_client.rb', line 30

def manager
  @manager
end

#refresh_intervalObject (readonly)

Returns the value of attribute refresh_interval.



30
31
32
# File 'lib/mongo/mongo_replica_set_client.rb', line 30

def refresh_interval
  @refresh_interval
end

#refresh_modeObject (readonly)

Returns the value of attribute refresh_mode.



30
31
32
# File 'lib/mongo/mongo_replica_set_client.rb', line 30

def refresh_mode
  @refresh_mode
end

#refresh_versionObject (readonly)

Returns the value of attribute refresh_version.



30
31
32
# File 'lib/mongo/mongo_replica_set_client.rb', line 30

def refresh_version
  @refresh_version
end

#replica_set_nameObject (readonly)

Returns the value of attribute replica_set_name.



30
31
32
# File 'lib/mongo/mongo_replica_set_client.rb', line 30

def replica_set_name
  @replica_set_name
end

#seedsObject (readonly)

Returns the value of attribute seeds.



30
31
32
# File 'lib/mongo/mongo_replica_set_client.rb', line 30

def seeds
  @seeds
end

Instance Method Details

#arbitersObject



413
414
415
# File 'lib/mongo/mongo_replica_set_client.rb', line 413

def arbiters
  local_manager.arbiters.nil? ? [] : local_manager.arbiters
end

#authenticate_poolsObject



328
329
330
# File 'lib/mongo/mongo_replica_set_client.rb', line 328

def authenticate_pools
  @manager.pools.each { |pool| pool.authenticate_existing }
end

#checkin(socket) ⇒ Object

Checkin a socket used for reading.



373
374
375
376
377
378
# File 'lib/mongo/mongo_replica_set_client.rb', line 373

def checkin(socket)
  if socket && socket.pool
    socket.checkin
  end
  sync_refresh
end

#checkoutObject

Generic socket checkout Takes a block that returns a socket from pool



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/mongo/mongo_replica_set_client.rb', line 338

def checkout
  ensure_manager

  connected? ? sync_refresh : connect

  begin
    socket = yield
  rescue => ex
    checkin(socket) if socket
    raise ex
  end

  if socket
    return socket
  else
    @connected = false
    raise ConnectionFailure.new("Could not checkout a socket.")
  end
end

#checkout_reader(read_pref = {}) ⇒ Object



358
359
360
361
362
363
# File 'lib/mongo/mongo_replica_set_client.rb', line 358

def checkout_reader(read_pref={})
  checkout do
    pool = read_pool(read_pref)
    get_socket_from_pool(pool)
  end
end

#checkout_writerObject

Checkout a socket for writing (i.e., a primary node).



366
367
368
369
370
# File 'lib/mongo/mongo_replica_set_client.rb', line 366

def checkout_writer
  checkout do
    get_socket_from_pool(primary_pool)
  end
end

#close(opts = {}) ⇒ Object

Close the connection to the database.



295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/mongo/mongo_replica_set_client.rb', line 295

def close(opts={})
  if opts[:soft]
    @manager.close(:soft => true) if @manager
  else
    @manager.close if @manager
  end

  # Clear the reference to this object.
  thread_local[:managers].delete(self)
  unpin_pool

  @connected = false
end

#connect(force = !connected?)) ⇒ Object

Initiate a connection to the replica set.

Raises:



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/mongo/mongo_replica_set_client.rb', line 181

def connect(force = !connected?)
  return unless force
  log(:info, "Connecting...")

  # Prevent recursive connection attempts from the same thread.
  # This is done rather than using a Monitor to prevent potentially recursing
  # infinitely while attempting to connect and continually failing. Instead, fail fast.
  raise ConnectionFailure, "Failed to get node data." if thread_local[:locks][:connecting] == true

  current_version = @refresh_version
  @connect_mutex.synchronize do
    # don't try to connect if another thread has done so while we were waiting for the lock
    return unless current_version == @refresh_version
    begin
      thread_local[:locks][:connecting] = true
      if @manager
        ensure_manager
        @manager.refresh!(@seeds)
      else
        @manager = PoolManager.new(self, @seeds)
        ensure_manager
        @manager.connect
      end
    ensure
      thread_local[:locks][:connecting] = false
    end
    @refresh_version += 1

    if @manager.pools.empty?
      close
      raise ConnectionFailure, "Failed to connect to any node."
    else
      @connected = true
    end
  end
end

#connected?Boolean

Returns:

  • (Boolean)


255
256
257
# File 'lib/mongo/mongo_replica_set_client.rb', line 255

def connected?
  @connected && !@manager.pools.empty?
end

#connecting?Boolean

Deprecated.

Returns:

  • (Boolean)


260
261
262
263
# File 'lib/mongo/mongo_replica_set_client.rb', line 260

def connecting?
  warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0."
  false
end

#ensure_managerObject



380
381
382
# File 'lib/mongo/mongo_replica_set_client.rb', line 380

def ensure_manager
  thread_local[:managers][self] = @manager
end

#get_socket_from_pool(pool) ⇒ Object



401
402
403
404
405
406
407
# File 'lib/mongo/mongo_replica_set_client.rb', line 401

def get_socket_from_pool(pool)
  begin
    pool.checkout if pool
  rescue ConnectionFailure
    nil
  end
end

#hard_refresh!Boolean

Force a hard refresh of this connection’s view of the replica set.

Returns:

  • (Boolean)

    true if hard refresh occurred. false is returned when unable to get the refresh lock.



249
250
251
252
253
# File 'lib/mongo/mongo_replica_set_client.rb', line 249

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  connect(true)
  return true
end

#hostString

The replica set primary’s host name.

Returns:



268
269
270
# File 'lib/mongo/mongo_replica_set_client.rb', line 268

def host
  @manager.primary_pool.host
end

#hostsObject



426
427
428
# File 'lib/mongo/mongo_replica_set_client.rb', line 426

def hosts
  local_manager ? local_manager.hosts : []
end

#inspectObject



175
176
177
178
# File 'lib/mongo/mongo_replica_set_client.rb', line 175

def inspect
  "<Mongo::MongoReplicaSetClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
    "@connected=#{@connected}>"
end

#local_managerObject



409
410
411
# File 'lib/mongo/mongo_replica_set_client.rb', line 409

def local_manager
  thread_local[:managers][self]
end

#logout_pools(db) ⇒ Object



332
333
334
# File 'lib/mongo/mongo_replica_set_client.rb', line 332

def logout_pools(db)
  @manager.pools.each { |pool| pool.logout_existing(db) }
end

#max_bson_sizeObject



450
451
452
453
# File 'lib/mongo/mongo_replica_set_client.rb', line 450

def max_bson_size
  return local_manager.max_bson_size if local_manager
  DEFAULT_MAX_BSON_SIZE
end

#max_message_sizeObject



455
456
457
458
# File 'lib/mongo/mongo_replica_set_client.rb', line 455

def max_message_size
  return local_manager.max_message_size if local_manager
  max_bson_size * MESSAGE_SIZE_FACTOR
end

#nodesObject



279
280
281
282
283
# File 'lib/mongo/mongo_replica_set_client.rb', line 279

def nodes
  warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " +
    "Please use MongoReplicaSetClient#seeds instead."
  @seeds
end

#pin_pool(pool, read_preference) ⇒ Object



388
389
390
391
392
393
394
395
# File 'lib/mongo/mongo_replica_set_client.rb', line 388

def pin_pool(pool, read_preference)
  if @manager
    thread_local[:pinned_pools][@manager.object_id] = {
      :pool => pool,
      :read_preference => read_preference
    }
  end
end

#pinned_poolObject



384
385
386
# File 'lib/mongo/mongo_replica_set_client.rb', line 384

def pinned_pool
  thread_local[:pinned_pools][@manager.object_id] if @manager
end

#poolsObject



442
443
444
# File 'lib/mongo/mongo_replica_set_client.rb', line 442

def pools
  local_manager ? local_manager.pools : []
end

#portInteger

The replica set primary’s port.

Returns:

  • (Integer)


275
276
277
# File 'lib/mongo/mongo_replica_set_client.rb', line 275

def port
  @manager.primary_pool.port
end

#primaryObject



417
418
419
# File 'lib/mongo/mongo_replica_set_client.rb', line 417

def primary
  local_manager ? local_manager.primary : nil
end

#primary_poolObject



430
431
432
# File 'lib/mongo/mongo_replica_set_client.rb', line 430

def primary_pool
  local_manager ? local_manager.primary_pool : nil
end

#read_primary?Boolean Also known as: primary?

Determine whether we’re reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.

Returns:

  • (Boolean)


289
290
291
# File 'lib/mongo/mongo_replica_set_client.rb', line 289

def read_primary?
  read_pool == primary_pool
end

#refresh(opts = {}) ⇒ Boolean

Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running MongoReplicaSetClient#hard_refresh!

Returns:

  • (Boolean)

    true unless a hard refresh is run and the refresh lock can’t be acquired.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/mongo/mongo_replica_set_client.rb', line 225

def refresh(opts={})
  if !connected?
    log(:info, "Trying to check replica set health but not " +
      "connected...")
    return hard_refresh!
  end

  log(:debug, "Checking replica set connection health...")
  ensure_manager
  @manager.check_connection_health

  if @manager.refresh_required?
    return hard_refresh!
  end

  return true
end

#reset_connectionObject

Deprecated.

If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values.



312
313
314
315
316
# File 'lib/mongo/mongo_replica_set_client.rb', line 312

def reset_connection
  close
  warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " +
    "Use MongoReplicaSetClient#close instead."
end

#secondariesObject

Note: might want to freeze these after connecting.



422
423
424
# File 'lib/mongo/mongo_replica_set_client.rb', line 422

def secondaries
  local_manager ? local_manager.secondaries : []
end

#secondary_poolObject



434
435
436
# File 'lib/mongo/mongo_replica_set_client.rb', line 434

def secondary_pool
  local_manager ? local_manager.secondary_pool : nil
end

#secondary_poolsObject



438
439
440
# File 'lib/mongo/mongo_replica_set_client.rb', line 438

def secondary_pools
  local_manager ? local_manager.secondary_pools : []
end

#slave_ok?Boolean

Returns true if it’s okay to read from a secondary node.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

Returns:

  • (Boolean)

    true



324
325
326
# File 'lib/mongo/mongo_replica_set_client.rb', line 324

def slave_ok?
  @read != :primary
end

#tag_mapObject



446
447
448
# File 'lib/mongo/mongo_replica_set_client.rb', line 446

def tag_map
  local_manager ? local_manager.tag_map : {}
end

#unpin_poolObject



397
398
399
# File 'lib/mongo/mongo_replica_set_client.rb', line 397

def unpin_pool
  thread_local[:pinned_pools].delete @manager.object_id if @manager
end

#valid_optsObject



171
172
173
# File 'lib/mongo/mongo_replica_set_client.rb', line 171

def valid_opts
  super + REPL_SET_OPTS - CLIENT_ONLY_OPTS
end