Class: Mongo::MongoReplicaSetClient

Inherits:
MongoClient show all
Includes:
ReadPreference, ThreadLocalVariableManager
Defined in:
lib/mongo/mongo_replica_set_client.rb

Overview

Instantiates and manages connections to a MongoDB replica set.

Direct Known Subclasses

MongoShardedClient, ReplSetConnection

Constant Summary collapse

REPL_SET_OPTS =
[
  :refresh_mode,
  :refresh_interval,
  :read_secondary,
  :rs_name,
  :name
]

Constants included from ReadPreference

ReadPreference::MONGOS_MODES, ReadPreference::READ_PREFERENCES

Constants inherited from MongoClient

Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::READ_PREFERENCE_OPTS, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS

Constants included from Networking

Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Instance Attribute Summary collapse

Attributes inherited from MongoClient

#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #read, #size, #socket_class, #tag_sets, #write_concern

Attributes included from WriteConcern

#legacy_write_concern

Instance Method Summary collapse

Methods included from ThreadLocalVariableManager

#thread_local

Methods included from ReadPreference

mongos, #read_pool, #read_preference, #select_near_pool, #select_pool, #select_secondary_pool, validate

Methods inherited from MongoClient

#[], #active?, #add_auth, #apply_saved_authentication, #clear_auths, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #host_port, #lock!, #locked?, #mongos?, multi, #parse_init, #ping, #read_pool, #remove_auth, #server_info, #server_version, #unlock!

Methods included from WriteConcern

#get_write_concern, gle?, #write_concern_from_legacy

Methods included from Networking

#receive_message, #send_message, #send_message_with_gle

Methods included from Logging

#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message

Constructor Details

#initialize(seeds = ENV["MONGODB_URI"], opts = {}) ⇒ MongoReplicaSetClient

Create a connection to a MongoDB replica set.

If no args are provided, it will check ENV["MONGODB_URI"].

Once connected to a replica set, you can find out which nodes are primary, secondary, and arbiters with the corresponding accessors: MongoClient#primary, MongoClient#secondaries, and MongoClient#arbiters. This is useful if your application needs to connect manually to nodes other than the primary.

Examples:

Connect to a replica set and provide two seed nodes.

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'])

Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named ‘prod’:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :name => 'prod')

Connect to a replica set providing two seed nodes and allowing reads from a secondary node:

MongoReplicaSetClient.new(['localhost:30000', 'localhost:30001'], :read => :secondary)
Note:

the number of seed nodes does not have to be equal to the number of replica set members. The purpose of seed nodes is to permit the driver to find at least one replica set member even if a member is down.

Parameters:

  • seeds (Array<String>, Array<Array(String, Integer)>) (defaults to: ENV["MONGODB_URI"])

Options Hash (opts):

  • :w (String, Integer, Symbol) — default: 1

    Set default number of nodes to which a write should be acknowledged

  • :j (Boolean) — default: false

    Set journal acknowledgement

  • :wtimeout (Integer) — default: nil

    Set acknowledgement timeout

  • :fsync (Boolean) — default: false

    Set fsync acknowledgement.

    Notes about write concern options:

    Write concern options are propagated to objects instantiated from this MongoReplicaSetClient.
    These defaults can be overridden upon instantiation of any object by explicitly setting an options hash
    on initialization.
    
  • :read (:primary, :primary_preferred, :secondary, :secondary_preferred, :nearest) — default: :primary

    A “read preference” determines the candidate replica set members to which a query or command can be sent.

    :primary
    • Read from primary only.

    • Cannot be combined with tags.

    :primary_preferred
    • Read from primary if available, otherwise read from a secondary.

    :secondary
    • Read from secondary if available.

    :secondary_preferred
    • Read from a secondary if available, otherwise read from the primary.

    :nearest
    • Read from any member.

  • :tag_sets (Array<Hash{ String, Symbol => Tag Value }>) — default: []

    Read from replica-set members with these tags.

  • :secondary_acceptable_latency_ms (Integer) — default: 15

    The acceptable nearest available member for a member to be considered “near”.

  • :logger (Logger) — default: nil

    Logger instance to receive driver operation log.

  • :pool_size (Integer) — default: 1

    The maximum number of socket connections allowed per connection pool. Note: this setting is relevant only for multi-threaded applications.

  • :pool_timeout (Float) — default: 5.0

    When all of the connections a pool are checked out, this is the number of seconds to wait for a new connection to be released before throwing an exception. Note: this setting is relevant only for multi-threaded applications.

  • :op_timeout (Float) — default: nil

    The number of seconds to wait for a read operation to time out.

  • :connect_timeout (Float) — default: 30

    The number of seconds to wait before timing out a connection attempt.

  • :ssl (Boolean) — default: false

    If true, create the connection to the server using SSL.

  • :refresh_mode (Boolean) — default: false

    Set this to :sync to periodically update the state of the connection every :refresh_interval seconds. Replica set connection failures will always trigger a complete refresh. This option is useful when you want to add new nodes or remove replica set nodes not currently in use by the driver.

  • :refresh_interval (Integer) — default: 90

    If :refresh_mode is enabled, this is the number of seconds between calls to check the replica set’s state.

Raises:

See Also:



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/mongo/mongo_replica_set_client.rb', line 95

def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}
  nodes = args.shift || []

  raise MongoArgumentError, "Too many arguments" unless args.empty?

  # This is temporary until support for the old format is dropped
  @seeds = nodes.collect do |node|
    if node.is_a?(Array)
      warn "Initiating a MongoReplicaSetClient with seeds passed as individual [host, port] array arguments is deprecated."
      warn "Please specify hosts as an array of 'host:port' strings; the old format will be removed in v2.0"
      node
    elsif node.is_a?(String)
      host, port = node.split(":")
      [ host, port.to_i ]
    else
      raise MongoArgumentError "Bad seed format!"
    end
  end

  if @seeds.empty? && ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    if parser.direct?
      raise MongoArgumentError,
        "ENV['MONGODB_URI'] implies a direct connection."
    end
    opts = parser.connection_options.merge! opts
    @seeds = parser.nodes
  end

  if @seeds.length.zero?
    raise MongoArgumentError, "A MongoReplicaSetClient requires at least one seed node."
  end

  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil

  # Lock for request ids.
  @id_lock = Mutex.new

  @pool_mutex = Mutex.new
  @connected = false

  @safe_mutex_lock = Mutex.new
  @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

  @connect_mutex = Mutex.new
  @refresh_mutex = Mutex.new

  @mongos = false

  check_opts(opts)
  setup(opts.dup)
end

Instance Attribute Details

#managerObject (readonly)

Returns the value of attribute manager.



16
17
18
# File 'lib/mongo/mongo_replica_set_client.rb', line 16

def manager
  @manager
end

#refresh_intervalObject (readonly)

Returns the value of attribute refresh_interval.



16
17
18
# File 'lib/mongo/mongo_replica_set_client.rb', line 16

def refresh_interval
  @refresh_interval
end

#refresh_modeObject (readonly)

Returns the value of attribute refresh_mode.



16
17
18
# File 'lib/mongo/mongo_replica_set_client.rb', line 16

def refresh_mode
  @refresh_mode
end

#refresh_versionObject (readonly)

Returns the value of attribute refresh_version.



16
17
18
# File 'lib/mongo/mongo_replica_set_client.rb', line 16

def refresh_version
  @refresh_version
end

#replica_set_nameObject (readonly)

Returns the value of attribute replica_set_name.



16
17
18
# File 'lib/mongo/mongo_replica_set_client.rb', line 16

def replica_set_name
  @replica_set_name
end

#seedsObject (readonly)

Returns the value of attribute seeds.



16
17
18
# File 'lib/mongo/mongo_replica_set_client.rb', line 16

def seeds
  @seeds
end

Instance Method Details

#arbitersObject



395
396
397
# File 'lib/mongo/mongo_replica_set_client.rb', line 395

def arbiters
  local_manager.arbiters.nil? ? [] : local_manager.arbiters
end

#authenticate_poolsObject



310
311
312
# File 'lib/mongo/mongo_replica_set_client.rb', line 310

def authenticate_pools
  @manager.pools.each { |pool| pool.authenticate_existing }
end

#checkin(socket) ⇒ Object

Checkin a socket used for reading.



355
356
357
358
359
360
# File 'lib/mongo/mongo_replica_set_client.rb', line 355

def checkin(socket)
  if socket && socket.pool
    socket.checkin
  end
  sync_refresh
end

#checkoutObject

Generic socket checkout Takes a block that returns a socket from pool



320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/mongo/mongo_replica_set_client.rb', line 320

def checkout
  ensure_manager

  connected? ? sync_refresh : connect

  begin
    socket = yield
  rescue => ex
    checkin(socket) if socket
    raise ex
  end

  if socket
    return socket
  else
    @connected = false
    raise ConnectionFailure.new("Could not checkout a socket.")
  end
end

#checkout_reader(read_pref = {}) ⇒ Object



340
341
342
343
344
345
# File 'lib/mongo/mongo_replica_set_client.rb', line 340

def checkout_reader(read_pref={})
  checkout do
    pool = read_pool(read_pref)
    get_socket_from_pool(pool)
  end
end

#checkout_writerObject

Checkout a socket for writing (i.e., a primary node).



348
349
350
351
352
# File 'lib/mongo/mongo_replica_set_client.rb', line 348

def checkout_writer
  checkout do
    get_socket_from_pool(primary_pool)
  end
end

#close(opts = {}) ⇒ Object

Close the connection to the database.



277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/mongo/mongo_replica_set_client.rb', line 277

def close(opts={})
  if opts[:soft]
    @manager.close(:soft => true) if @manager
  else
    @manager.close if @manager
  end

  # Clear the reference to this object.
  thread_local[:managers].delete(self)
  unpin_pool

  @connected = false
end

#connectObject

Initiate a connection to the replica set.

Raises:



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/mongo/mongo_replica_set_client.rb', line 166

def connect
  log(:info, "Connecting...")

  # Prevent recursive connection attempts from the same thread.
  # This is done rather than using a Monitor to prevent potentially recursing
  # infinitely while attempting to connect and continually failing. Instead, fail fast.
  raise ConnectionFailure, "Failed to get node data." if thread_local[:locks][:connecting] == true

  @connect_mutex.synchronize do
    return if @connected
    begin
      thread_local[:locks][:connecting] = true
      if @manager
        @manager.refresh! @seeds
      else
        @manager = PoolManager.new(self, @seeds)
        thread_local[:managers][self] = @manager
        @manager.connect
      end
    ensure
      thread_local[:locks][:connecting] = false
    end
    @refresh_version += 1

    if @manager.pools.empty?
      close
      raise ConnectionFailure, "Failed to connect to any node."
    else
      @connected = true
    end
  end
end

#connected?Boolean

Returns:

  • (Boolean)


237
238
239
# File 'lib/mongo/mongo_replica_set_client.rb', line 237

def connected?
  @connected && !@manager.pools.empty?
end

#connecting?Boolean

Deprecated.

Returns:

  • (Boolean)


242
243
244
245
# File 'lib/mongo/mongo_replica_set_client.rb', line 242

def connecting?
  warn "MongoReplicaSetClient#connecting? is deprecated and will be removed in v2.0."
  false
end

#ensure_managerObject



362
363
364
# File 'lib/mongo/mongo_replica_set_client.rb', line 362

def ensure_manager
  thread_local[:managers][self] = @manager
end

#get_socket_from_pool(pool) ⇒ Object



383
384
385
386
387
388
389
# File 'lib/mongo/mongo_replica_set_client.rb', line 383

def get_socket_from_pool(pool)
  begin
    pool.checkout if pool
  rescue ConnectionFailure
    nil
  end
end

#hard_refresh!Boolean

Force a hard refresh of this connection’s view of the replica set.

Returns:

  • (Boolean)

    true if hard refresh occurred. false is returned when unable to get the refresh lock.



229
230
231
232
233
234
235
# File 'lib/mongo/mongo_replica_set_client.rb', line 229

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  @manager.refresh! @seeds

  @refresh_version += 1
  return true
end

#hostString

The replica set primary’s host name.

Returns:



250
251
252
# File 'lib/mongo/mongo_replica_set_client.rb', line 250

def host
  @manager.primary_pool.host
end

#hostsObject



408
409
410
# File 'lib/mongo/mongo_replica_set_client.rb', line 408

def hosts
  local_manager ? local_manager.hosts : []
end

#inspectObject



160
161
162
163
# File 'lib/mongo/mongo_replica_set_client.rb', line 160

def inspect
  "<Mongo::MongoReplicaSetClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
    "@connected=#{@connected}>"
end

#local_managerObject



391
392
393
# File 'lib/mongo/mongo_replica_set_client.rb', line 391

def local_manager
  thread_local[:managers][self]
end

#logout_pools(db) ⇒ Object



314
315
316
# File 'lib/mongo/mongo_replica_set_client.rb', line 314

def logout_pools(db)
  @manager.pools.each { |pool| pool.logout_existing(db) }
end

#max_bson_sizeObject



428
429
430
431
# File 'lib/mongo/mongo_replica_set_client.rb', line 428

def max_bson_size
  return local_manager.max_bson_size if local_manager
  DEFAULT_MAX_BSON_SIZE
end

#max_message_sizeObject



433
434
435
436
# File 'lib/mongo/mongo_replica_set_client.rb', line 433

def max_message_size
  return local_manager.max_message_size if local_manager
  DEFAULT_MAX_MESSAGE_SIZE
end

#nodesObject



261
262
263
264
265
# File 'lib/mongo/mongo_replica_set_client.rb', line 261

def nodes
  warn "MongoReplicaSetClient#nodes is DEPRECATED and will be removed in v2.0. " +
    "Please use MongoReplicaSetClient#seeds instead."
  @seeds
end

#pin_pool(pool, read_preference) ⇒ Object



370
371
372
373
374
375
376
377
# File 'lib/mongo/mongo_replica_set_client.rb', line 370

def pin_pool(pool, read_preference)
  if @manager
    thread_local[:pinned_pools][@manager.object_id] = {
      :pool => pool,
      :read_preference => read_preference
    }
  end
end

#pinned_poolObject



366
367
368
# File 'lib/mongo/mongo_replica_set_client.rb', line 366

def pinned_pool
  thread_local[:pinned_pools][@manager.object_id] if @manager
end

#portInteger

The replica set primary’s port.

Returns:

  • (Integer)


257
258
259
# File 'lib/mongo/mongo_replica_set_client.rb', line 257

def port
  @manager.primary_pool.port
end

#primaryObject



399
400
401
# File 'lib/mongo/mongo_replica_set_client.rb', line 399

def primary
  local_manager ? local_manager.primary : nil
end

#primary_poolObject



412
413
414
# File 'lib/mongo/mongo_replica_set_client.rb', line 412

def primary_pool
  local_manager ? local_manager.primary_pool : nil
end

#read_primary?Boolean Also known as: primary?

Determine whether we’re reading from a primary node. If false, this connection connects to a secondary node and @read_secondaries is true.

Returns:

  • (Boolean)


271
272
273
# File 'lib/mongo/mongo_replica_set_client.rb', line 271

def read_primary?
  read_pool == primary_pool
end

#refresh(opts = {}) ⇒ Boolean

Determine whether a replica set refresh is required. If so, run a hard refresh. You can force a hard refresh by running MongoReplicaSetClient#hard_refresh!

Returns:

  • (Boolean)

    true unless a hard refresh is run and the refresh lock can’t be acquired.



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/mongo/mongo_replica_set_client.rb', line 206

def refresh(opts={})
  if !connected?
    log(:info, "Trying to check replica set health but not " +
      "connected...")
    return hard_refresh!
  end

  log(:debug, "Checking replica set connection health...")
  @manager.check_connection_health

  if @manager.refresh_required?
    return hard_refresh!
  end

  return true
end

#reset_connectionObject

Deprecated.

If a ConnectionFailure is raised, this method will be called to close the connection and reset connection values.



294
295
296
297
298
# File 'lib/mongo/mongo_replica_set_client.rb', line 294

def reset_connection
  close
  warn "MongoReplicaSetClient#reset_connection is now deprecated and will be removed in v2.0. " +
    "Use MongoReplicaSetClient#close instead."
end

#secondariesObject

Note: might want to freeze these after connecting.



404
405
406
# File 'lib/mongo/mongo_replica_set_client.rb', line 404

def secondaries
  local_manager ? local_manager.secondaries : []
end

#secondary_poolObject



416
417
418
# File 'lib/mongo/mongo_replica_set_client.rb', line 416

def secondary_pool
  local_manager ? local_manager.secondary_pool : nil
end

#secondary_poolsObject



420
421
422
# File 'lib/mongo/mongo_replica_set_client.rb', line 420

def secondary_pools
  local_manager ? local_manager.secondary_pools : []
end

#slave_ok?Boolean

Returns true if it’s okay to read from a secondary node.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

Returns:

  • (Boolean)

    true



306
307
308
# File 'lib/mongo/mongo_replica_set_client.rb', line 306

def slave_ok?
  @read != :primary
end

#tag_mapObject



424
425
426
# File 'lib/mongo/mongo_replica_set_client.rb', line 424

def tag_map
  local_manager ? local_manager.tag_map : {}
end

#unpin_poolObject



379
380
381
# File 'lib/mongo/mongo_replica_set_client.rb', line 379

def unpin_pool
  thread_local[:pinned_pools].delete @manager.object_id if @manager
end

#valid_optsObject



156
157
158
# File 'lib/mongo/mongo_replica_set_client.rb', line 156

def valid_opts
  super + REPL_SET_OPTS - CLIENT_ONLY_OPTS
end