Class: Mongo::Connection

Inherits:
Object show all
Defined in:
lib/mongo/connection.rb

Overview

Instantiates and manages connections to MongoDB.

Constant Summary collapse

DEFAULT_PORT =
27017
STANDARD_HEADER_SIZE =
16
RESPONSE_HEADER_SIZE =
20
@@current_request_id =

Counter for generating unique request ids.

0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pair_or_host = nil, port = nil, options = {}) ⇒ Connection

Create a connection to MongoDB. Specify either one or a pair of servers, along with a maximum connection pool size and timeout.

If connecting to just one server, you may specify whether connection to slave is permitted. In all cases, the default host is “localhost” and the default port is 27017.

When specifying a pair, pair_or_host, is a hash with two keys: :left and :right. Each key maps to either

  • a server name, in which case port is 27017,

  • a port number, in which case the server is “localhost”, or

  • an array containing [server_name, port_number]

Note that there are a few issues when using connection pooling with Ruby 1.9 on Windows. These should be resolved in the next release.

Examples:

localhost, 27017

Connection.new

localhost, 27017

Connection.new("localhost")

localhost, 3000, max 5 connections, with max 5 seconds of wait time.

Connection.new("localhost", 3000, :pool_size => 5, :timeout => 5)

localhost, 3000, where this node may be a slave

Connection.new("localhost", 3000, :slave_ok => true)

A pair of servers. The driver will always talk to master.

# On connection errors, Mongo::ConnectionFailure will be raised.
Connection.new({:left  => ["db1.example.com", 27017],
               :right => ["db2.example.com", 27017]})

A pair of servers with connection pooling enabled. Note the nil param placeholder for port.

Connection.new({:left  => ["db1.example.com", 27017],
                :right => ["db2.example.com", 27017]}, nil,
                :pool_size => 20, :timeout => 5)

Parameters:

  • pair_or_host (String, Hash) (defaults to: nil)

    See explanation above.

  • port (Integer) (defaults to: nil)

    specify a port number here if only one host is being specified. Leave nil if specifying a pair of servers in pair_or_host.

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :slave_ok (Boolean) — default: false

    Must be set to true when connecting to a single, slave node.

  • :logger (Logger, #debug) — default: nil

    Logger instance to receive driver operation log.

  • :auto_reconnect (Boolean)
  • :pool_size (Integer) — default: 1

    The maximum number of socket connections that can be opened to the database.

  • :timeout (Float) — default: 5.0

    When all of the connections to the pool are checked out, this is the number of seconds to wait for a new connection to be released before throwing an exception.

See Also:



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/mongo/connection.rb', line 88

def initialize(pair_or_host=nil, port=nil, options={})
  @nodes = format_pair(pair_or_host, port)

  # Host and port of current master.
  @host = @port = nil

  # Lock for request ids.
  @id_lock = Mutex.new

  # Pool size and timeout.
  @size      = options[:pool_size] || 1
  @timeout   = options[:timeout]   || 5.0

  # Mutex for synchronizing pool access
  @connection_mutex = Mutex.new
  @safe_mutex = Mutex.new

  # Condition variable for signal and wait
  @queue = ConditionVariable.new

  @sockets      = []
  @checked_out  = []

  if options[:auto_reconnect]
    warn(":auto_reconnect is deprecated. see http://www.mongodb.org/display/DOCS/Replica+Pairs+in+Ruby")
  end

  # slave_ok can be true only if one node is specified
  @slave_ok = options[:slave_ok] && @nodes.length == 1
  @logger   = options[:logger] || nil
  @options  = options

  should_connect = options[:connect].nil? ? true : options[:connect]
  connect_to_master if should_connect
end

Instance Attribute Details

#checked_outObject (readonly)

Returns the value of attribute checked_out.



33
34
35
# File 'lib/mongo/connection.rb', line 33

def checked_out
  @checked_out
end

#hostObject (readonly)

Returns the value of attribute host.



33
34
35
# File 'lib/mongo/connection.rb', line 33

def host
  @host
end

#loggerObject (readonly)

Returns the value of attribute logger.



33
34
35
# File 'lib/mongo/connection.rb', line 33

def logger
  @logger
end

#nodesObject (readonly)

Returns the value of attribute nodes.



33
34
35
# File 'lib/mongo/connection.rb', line 33

def nodes
  @nodes
end

#portObject (readonly)

Returns the value of attribute port.



33
34
35
# File 'lib/mongo/connection.rb', line 33

def port
  @port
end

#sizeObject (readonly)

Returns the value of attribute size.



33
34
35
# File 'lib/mongo/connection.rb', line 33

def size
  @size
end

#socketsObject (readonly)

Returns the value of attribute sockets.



33
34
35
# File 'lib/mongo/connection.rb', line 33

def sockets
  @sockets
end

Instance Method Details

#[](db_name) ⇒ Mongo::DB

Shortcut for returning a database. Use DB#db to accept options.

Parameters:

  • db_name (String)

    a valid database name.

Returns:



157
158
159
# File 'lib/mongo/connection.rb', line 157

def [](db_name)
  DB.new(db_name, self, :logger => @logger)
end

#closeObject

Close the connection to the database.



337
338
339
340
341
342
343
344
# File 'lib/mongo/connection.rb', line 337

def close
  @sockets.each do |sock|
    sock.close
  end
  @host = @port = nil
  @sockets.clear
  @checked_out.clear
end

#connect_to_masterObject

Create a new socket and attempt to connect to master. If successful, sets host and port to master and returns the socket.

Raises:



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# File 'lib/mongo/connection.rb', line 300

def connect_to_master
  close
  @host = @port = nil
  for node_pair in @nodes
    host, port = *node_pair
    begin
      socket = TCPSocket.new(host, port)
      socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

      # If we're connected to master, set the @host and @port
      result = self['admin'].command({:ismaster => 1}, false, false, socket)
      if result['ok'] == 1 && ((is_master = result['ismaster'] == 1) || @slave_ok)
        @host, @port = host, port
      end

      # Note: slave_ok can be true only when connecting to a single node.
      if @nodes.length == 1 && !is_master && !@slave_ok
        raise ConfigurationError, "Trying to connect directly to slave; " +
          "if this is what you want, specify :slave_ok => true."
      end

      break if is_master || @slave_ok
    rescue SocketError, SystemCallError, IOError => ex
      socket.close if socket
      false
    end
  end
  raise ConnectionFailure, "failed to connect to any given host:port" unless socket
end

#connected?Boolean

Are we connected to MongoDB? This is determined by checking whether host and port have values, since they’re set to nil on calls to #close.

Returns:

  • (Boolean)


332
333
334
# File 'lib/mongo/connection.rb', line 332

def connected?
  @host && @port
end

#copy_database(from, to, from_host = "localhost") ⇒ Object

Copy the database from on the local server to to on the specified host. host defaults to ‘localhost’ if no value is provided.

Parameters:

  • from (String)

    name of the database to copy from.

  • to (String)

    name of the database to copy to.

  • from_host (String) (defaults to: "localhost")

    host of the ‘from’ database.



174
175
176
177
178
179
180
181
# File 'lib/mongo/connection.rb', line 174

def copy_database(from, to, from_host="localhost")
  oh = OrderedHash.new
  oh[:copydb]   = 1
  oh[:fromhost] = from_host
  oh[:fromdb]   = from
  oh[:todb]     = to
  self["admin"].command(oh)
end

#database_infoHash

Return a hash with all database names and their respective sizes on disk.

Returns:

  • (Hash)


128
129
130
131
132
133
# File 'lib/mongo/connection.rb', line 128

def database_info
  doc = self['admin'].command(:listDatabases => 1)
  returning({}) do |info|
    doc['databases'].each { |db| info[db['name']] = db['sizeOnDisk'].to_i }
  end
end

#database_namesArray

Return an array of database names.

Returns:

  • (Array)


138
139
140
# File 'lib/mongo/connection.rb', line 138

def database_names
  database_info.keys
end

#db(db_name, options = {}) ⇒ Mongo::DB

Return a database with the given name. See DB#new for valid options hash parameters.

Parameters:

  • db_name (String)

    a valid database name.

Returns:



148
149
150
# File 'lib/mongo/connection.rb', line 148

def db(db_name, options={})
  DB.new(db_name, self, options.merge(:logger => @logger))
end

#drop_database(name) ⇒ Object

Drop a database.

Parameters:

  • name (String)

    name of an existing database.



164
165
166
# File 'lib/mongo/connection.rb', line 164

def drop_database(name)
  self[name].command(:dropDatabase => 1)
end

#get_request_idObject

Increment and return the next available request id.

return [Integer]



186
187
188
189
190
191
192
# File 'lib/mongo/connection.rb', line 186

def get_request_id
  request_id = ''
  @id_lock.synchronize do
    request_id = @@current_request_id += 1
  end
  request_id
end

#receive_message(operation, message, log_message = nil, socket = nil) ⇒ Array

Sends a message to the database and waits for the response.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (ByteBuffer)

    a message to send to the database.

  • log_message (String) (defaults to: nil)

    text version of message for logging.

  • socket (Socket) (defaults to: nil)

    a socket to use in lieu of checking out a new one.

Returns:

  • (Array)

    An array whose indexes include [0] documents returned, [1] number of document received, and [3] a cursor_id.



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/mongo/connection.rb', line 279

def receive_message(operation, message, log_message=nil, socket=nil)
  packed_message = add_message_headers(operation, message).to_s
  @logger.debug("  MONGODB #{log_message || message}") if @logger
  begin
    sock = socket || checkout

    result = ''
    @safe_mutex.synchronize do
      send_message_on_socket(packed_message, sock)
      result = receive(sock)
    end
  ensure
    checkin(sock)
  end
  result
end

#send_message(operation, message, log_message = nil) ⇒ True

Send a message to MongoDB, adding the necessary headers.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (ByteBuffer)

    a message to send to the database.

  • log_message (String) (defaults to: nil)

    text version of message for logging.

Returns:

  • (True)


226
227
228
229
230
231
232
233
234
235
# File 'lib/mongo/connection.rb', line 226

def send_message(operation, message, log_message=nil)
  @logger.debug("  MONGODB #{log_message || message}") if @logger
  begin
    packed_message = add_message_headers(operation, message).to_s
    socket = checkout
    send_message_on_socket(packed_message, socket)
  ensure
    checkin(socket)
  end
end

#send_message_with_safe_check(operation, message, db_name, log_message = nil) ⇒ Array

Sends a message to the database, waits for a response, and raises an exception if the operation has failed.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (ByteBuffer)

    a message to send to the database.

  • db_name (String)

    the name of the database. used on call to get_last_error.

  • log_message (String) (defaults to: nil)

    text version of message for logging.

Returns:

  • (Array)

    An array whose indexes include [0] documents returned, [1] number of document received, and [3] a cursor_id.



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/mongo/connection.rb', line 248

def send_message_with_safe_check(operation, message, db_name, log_message=nil)
  message_with_headers = add_message_headers(operation, message)
  message_with_check   = last_error_message(db_name)
  @logger.debug("  MONGODB #{log_message || message}") if @logger
  begin
    sock = checkout
    packed_message = message_with_headers.append!(message_with_check).to_s
    docs = num_received = cursor_id = ''
    @safe_mutex.synchronize do
      send_message_on_socket(packed_message, sock)
      docs, num_received, cursor_id = receive(sock)
    end
  ensure
    checkin(sock)
  end
  if num_received == 1 && error = docs[0]['err']
    raise Mongo::OperationFailure, error
  end
  [docs, num_received, cursor_id]
end

#server_infoHash

Get the build information for the current connection.

Returns:

  • (Hash)


197
198
199
# File 'lib/mongo/connection.rb', line 197

def server_info
  db("admin").command({:buildinfo => 1}, {:admin => true, :check_response => true})
end

#server_versionMongo::ServerVersion

Get the build version of the current server.

Returns:



205
206
207
# File 'lib/mongo/connection.rb', line 205

def server_version
  ServerVersion.new(server_info["version"])
end

#slave_ok?Boolean

Is it okay to connect to a slave?

Returns:

  • (Boolean)


212
213
214
# File 'lib/mongo/connection.rb', line 212

def slave_ok?
  @slave_ok
end