Module: Mongo::Networking

Included in:
MongoClient
Defined in:
lib/mongo/networking.rb

Constant Summary collapse

STANDARD_HEADER_SIZE =
16
RESPONSE_HEADER_SIZE =
20
@@current_request_id =

Counter for generating unique request ids.

0

Instance Method Summary collapse

Instance Method Details

#receive_message(operation, message, log_message = nil, socket = nil, command = false, read = :primary, exhaust = false) ⇒ Array

Sends a message to the database and waits for the response.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (BSON::ByteBuffer)

    a message to send to the database.

  • log_message (String) (defaults to: nil)

    this is currently a no-op and will be removed.

  • socket (Socket) (defaults to: nil)

    a socket to use in lieu of checking out a new one.

  • command (Boolean) (defaults to: false)

    (false) indicate whether this is a command. If this is a command, the message will be sent to the primary node.

  • command (Boolean) (defaults to: false)

    (false) indicate whether the cursor should be exhausted. Set this to true only when the OP_QUERY_EXHAUST flag is set.

Returns:

  • (Array)

    An array whose indexes include [0] documents returned, [1] number of document received, and [3] a cursor_id.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/mongo/networking.rb', line 124

def receive_message(operation, message, log_message=nil, socket=nil, command=false,
                    read=:primary, exhaust=false)
  request_id = add_message_headers(message, operation)
  packed_message = message.to_s

  result = ''

  begin
    send_message_on_socket(packed_message, socket)
    result = receive(socket, request_id, exhaust)
  rescue ConnectionFailure => ex
    socket.close
    checkin(socket)
    raise ex
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  rescue Exception => ex
    if defined?(IRB)
      close if ex.class == IRB::Abort
    end
    raise ex
  end
  result
end

#send_message(operation, message, opts = {}) ⇒ Integer

Send a message to MongoDB, adding the necessary headers.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (BSON::ByteBuffer)

    a message to send to the database.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :connection (Symbol) — default: :writer

    The connection to which this message should be sent. Valid options are :writer and :reader.

Returns:

  • (Integer)

    number of bytes sent



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/mongo/networking.rb', line 33

def send_message(operation, message, opts={})
  if opts.is_a?(String)
    warn "MongoClient#send_message no longer takes a string log message. " +
      "Logging is now handled within the Collection and Cursor classes."
    opts = {}
  end

  add_message_headers(message, operation)
  packed_message = message.to_s

  sock = nil
  pool = opts.fetch(:pool, nil)
  begin
    if pool
      #puts "send_message pool.port:#{pool.port}"
      sock = pool.checkout
    else
      sock ||= checkout_writer
    end
    send_message_on_socket(packed_message, sock)
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  ensure
    if sock
      sock.checkin
    end
  end
end

#send_message_with_gle(operation, message, db_name, log_message = nil, write_concern = false) ⇒ Hash

Sends a message to the database, waits for a response, and raises an exception if the operation has failed.

Parameters:

  • operation (Integer)

    a MongoDB opcode.

  • message (BSON::ByteBuffer)

    a message to send to the database.

  • db_name (String)

    the name of the database. used on call to get_last_error.

  • last_error_params (Hash)

    parameters to be sent to getLastError. See DB#error for available options.

Returns:

  • (Hash)

    The document returned by the call to getlasterror.

See Also:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/mongo/networking.rb', line 75

def send_message_with_gle(operation, message, db_name, log_message=nil, write_concern=false)
  docs = num_received = cursor_id = ''
  add_message_headers(message, operation)

  last_error_message = build_get_last_error_message(db_name, write_concern)
  last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)

  packed_message = message.append!(last_error_message).to_s
  sock = nil
  begin
    sock = checkout_writer
    send_message_on_socket(packed_message, sock)
    docs, num_received, cursor_id = receive(sock, last_error_id)
    checkin(sock)
  rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
    checkin(sock)
    raise ex
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  end

  if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
    if error.include?("not master")
      close
      raise ConnectionFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
    else
      error = "wtimeout" if error == "timeout"
      raise OperationFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
    end
  end

  docs[0]
end