Module: Mongo::Networking

Included in:
MongoClient
Defined in:
lib/mongo/networking.rb

Constant Summary collapse

STANDARD_HEADER_SIZE =
16
RESPONSE_HEADER_SIZE =
20
@@current_request_id =

Counter for generating unique request ids.

0

Instance Method Summary collapse

Instance Method Details

#receive_message(operation, message, log_message = nil, socket = nil, command = false, read = :primary, exhaust = false) ⇒ Array

Sends a message to the database and waits for the response.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (BSON::ByteBuffer)

    a message to send to the database.

  • log_message (String) (defaults to: nil)

    this is currently a no-op and will be removed.

  • socket (Socket) (defaults to: nil)

    a socket to use in lieu of checking out a new one.

  • command (Boolean) (defaults to: false)

    (false) indicate whether this is a command. If this is a command, the message will be sent to the primary node.

  • command (Boolean) (defaults to: false)

    (false) indicate whether the cursor should be exhausted. Set this to true only when the OP_QUERY_EXHAUST flag is set.

Returns:

  • (Array)

    An array whose indexes include [0] documents returned, [1] number of document received, and [3] a cursor_id.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/mongo/networking.rb', line 108

def receive_message(operation, message, log_message=nil, socket=nil, command=false,
                    read=:primary, exhaust=false)
  request_id = add_message_headers(message, operation)
  packed_message = message.to_s

  result = ''

  begin
    send_message_on_socket(packed_message, socket)
    result = receive(socket, request_id, exhaust)
  rescue ConnectionFailure => ex
    checkin(socket)
    raise ex
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  rescue Exception => ex
    if defined?(IRB)
      close if ex.class == IRB::Abort
    end
    raise ex
  end
  result
end

#send_message(operation, message, opts = {}) ⇒ Integer

Send a message to MongoDB, adding the necessary headers.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (BSON::ByteBuffer)

    a message to send to the database.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :connection (Symbol) — default: :writer

    The connection to which this message should be sent. Valid options are :writer and :reader.

Returns:

  • (Integer)

    number of bytes sent



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/mongo/networking.rb', line 19

def send_message(operation, message, opts={})
  if opts.is_a?(String)
    warn "MongoClient#send_message no longer takes a string log message. " +
      "Logging is now handled within the Collection and Cursor classes."
    opts = {}
  end

  add_message_headers(message, operation)
  packed_message = message.to_s

  sock = opts.fetch(:socket, nil)
  begin
    if operation == Mongo::Constants::OP_KILL_CURSORS && @read != :primary
      sock ||= checkout_reader
    else
      sock ||= checkout_writer
    end
    send_message_on_socket(packed_message, sock)
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  ensure
    if sock
      sock.pool.checkin(sock)
    end
  end
end

#send_message_with_gle(operation, message, db_name, log_message = nil, write_concern = false) ⇒ Hash

Sends a message to the database, waits for a response, and raises an exception if the operation has failed.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (BSON::ByteBuffer)

    a message to send to the database.

  • db_name (String)

    the name of the database. used on call to get_last_error.

  • last_error_params (Hash)

    parameters to be sent to getLastError. See DB#error for available options.

Returns:

  • (Hash)

    The document returned by the call to getlasterror.

See Also:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/mongo/networking.rb', line 59

def send_message_with_gle(operation, message, db_name, log_message=nil, write_concern=false)
  docs = num_received = cursor_id = ''
  add_message_headers(message, operation)

  last_error_message = build_get_last_error_message(db_name, write_concern)
  last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)

  packed_message = message.append!(last_error_message).to_s
  sock = nil
  begin
    sock = checkout_writer
    send_message_on_socket(packed_message, sock)
    docs, num_received, cursor_id = receive(sock, last_error_id)
    checkin(sock)
  rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
    checkin(sock)
    raise ex
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  end

  if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
    if error.include?("not master")
      close
      raise ConnectionFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
    else
      error = "wtimeout" if error == "timeout"
      raise OperationFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
    end
  end

  docs[0]
end