Module: Mongo::Networking

Included in:
MongoClient
Defined in:
lib/mongo/networking.rb

Constant Summary collapse

STANDARD_HEADER_SIZE =
16
RESPONSE_HEADER_SIZE =
20
@@current_request_id =

Counter for generating unique request ids.

0

Instance Method Summary collapse

Instance Method Details

#receive_message(operation, message, log_message = nil, socket = nil, command = false, read = :primary, exhaust = false) ⇒ Array

Sends a message to the database and waits for the response.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/mongo/networking.rb', line 110

def receive_message(operation, message, log_message=nil, socket=nil, command=false,
                    read=:primary, exhaust=false)
  request_id = add_message_headers(message, operation)
  packed_message = message.to_s

  result = ''

  begin
    send_message_on_socket(packed_message, socket)
    result = receive(socket, request_id, exhaust)
  rescue ConnectionFailure => ex
    socket.close
    checkin(socket)
    raise ex
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  rescue Exception => ex
    if defined?(IRB)
      close if ex.class == IRB::Abort
    end
    raise ex
  end
  result
end

#send_message(operation, message, opts = {}) ⇒ Integer

Send a message to MongoDB, adding the necessary headers.

Options Hash (opts):

  • :connection (Symbol) — default: :writer

    The connection to which this message should be sent. Valid options are :writer and :reader.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/mongo/networking.rb', line 19

def send_message(operation, message, opts={})
  if opts.is_a?(String)
    warn "MongoClient#send_message no longer takes a string log message. " +
      "Logging is now handled within the Collection and Cursor classes."
    opts = {}
  end

  add_message_headers(message, operation)
  packed_message = message.to_s

  sock = nil
  pool = opts.fetch(:pool, nil)
  begin
    if pool
      #puts "send_message pool.port:#{pool.port}"
      sock = pool.checkout
    else
      sock ||= checkout_writer
    end
    send_message_on_socket(packed_message, sock)
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  ensure
    if sock
      sock.checkin
    end
  end
end

#send_message_with_gle(operation, message, db_name, log_message = nil, write_concern = false) ⇒ Hash

Sends a message to the database, waits for a response, and raises an exception if the operation has failed.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/mongo/networking.rb', line 61

def send_message_with_gle(operation, message, db_name, log_message=nil, write_concern=false)
  docs = num_received = cursor_id = ''
  add_message_headers(message, operation)

  last_error_message = build_get_last_error_message(db_name, write_concern)
  last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)

  packed_message = message.append!(last_error_message).to_s
  sock = nil
  begin
    sock = checkout_writer
    send_message_on_socket(packed_message, sock)
    docs, num_received, cursor_id = receive(sock, last_error_id)
    checkin(sock)
  rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
    checkin(sock)
    raise ex
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    close
    raise ex
  end

  if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
    if error.include?("not master")
      close
      raise ConnectionFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
    else
      error = "wtimeout" if error == "timeout"
      raise OperationFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
    end
  end

  docs[0]
end