Module: WireMongo

Defined in:
lib/mongo-proxy/wire.rb

Overview

This is a set of functions for dealing with the Mongo wire protocol. It has methods for moving between Ruby hashes and Mongo wire segments, including their embedded BSON. The MongoDB wire protocol is documented at:

Constant Summary collapse

HEADER_SIZE =
16
OP_REPLY =
:reply
OP_MSG =
:msg
OP_UPDATE =
:update
OP_INSERT =
:insert
OP_QUERY =
:query
OP_GET_MORE =
:get_more
OP_DELETE =
:delete
OP_KILL_CURSORS =
:kill_cursors
OPS =
{
  1    => OP_REPLY,
  1000 => OP_MSG,
  2001 => OP_UPDATE,
  2002 => OP_INSERT,
  2004 => OP_QUERY,
  2005 => OP_GET_MORE,
  2006 => OP_DELETE,
  2007 => OP_KILL_CURSORS
}
OPS_INVERTED =
OPS.invert
FLAG_UPDATE_UPSERT =
1
FLAG_UPDATE_MULTIUPDATE =
(1 << 1)
FLAG_DELETE_MULTI =
1
@@hash_getmore_history =
{}

Class Method Summary collapse

Class Method Details

.build_delete(request_id, database_name, collection_name, selector, opt = []) ⇒ Object



437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/mongo-proxy/wire.rb', line 437

def self.build_delete(request_id, database_name, collection_name, selector, opt = [])
  flags = 0
  flags = FLAGS_DELETE_MULTI if opt.include?(:multi)

  {
    :header => {
      :opCode => OP_DELETE,
      :requestID => request_id,
      :responseTo => 0
    },
    :database => database_name,
    :collection => collection_name,
    :selector => selector,
    :flags => flags
  }
end

.build_full_collection(database, collection) ⇒ Object



180
181
182
# File 'lib/mongo-proxy/wire.rb', line 180

def self.build_full_collection(database, collection)
  return "#{database}.#{collection}"
end

.build_get_more(request_id, response_to, database_name, collection_name, cursor_id, number_to_return = 0) ⇒ Object



396
397
398
399
400
401
402
403
404
405
406
407
408
# File 'lib/mongo-proxy/wire.rb', line 396

def self.build_get_more(request_id, response_to, database_name, collection_name, cursor_id, number_to_return = 0)
  {
    :header => {
      :opCode => OP_GET_MORE,
      :requestID => request_id,
      :responseTo => response_to
    },
    :database => database_name,
    :collection => collection_name,
    :cursorID => cursor_id,
    :numberToReturn => number_to_return
  }
end

.build_insert(request_id, database_name, collection_name, documents, flags = 0) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/mongo-proxy/wire.rb', line 299

def self.build_insert(request_id, database_name, collection_name, documents, flags = 0)
  documents = [documents] if documents.is_a?(Hash)
  return {
    :flags => flags,
    :database => database_name,
    :collection => collection_name,
    :documents => documents,
    :header => {
      :requestID => request_id,
      :responseTo => 0,
      :opCode => OP_INSERT
    }
  }
end

.build_kill_cursors(request_id, response_to, cursor_ids) ⇒ Object



480
481
482
483
484
485
486
487
488
489
# File 'lib/mongo-proxy/wire.rb', line 480

def self.build_kill_cursors(request_id, response_to, cursor_ids)
  return {
    :cursorIDs => cursor_ids,
    :header => {
      :opCode => OP_KILL_CURSORS,
      :requestID => request_id,
      :responseTo => response_to
    }
  }
end

.build_query(request_id, database_name, collection_name, query = {}, fields = nil, num_to_return = 4294967295, number_to_skip = 0, flags = 0) ⇒ Object



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/mongo-proxy/wire.rb', line 348

def self.build_query(request_id, database_name, collection_name,
    query = {}, fields = nil, num_to_return = 4294967295, number_to_skip = 0, flags = 0)
  {
    :header => {
      :opCode => OP_QUERY,
      :requestID => request_id,
      :responseTo => 0
    },
    :database => database_name,
    :collection => collection_name,
    :query => query,
    :returnFieldSelector => fields,
    :numberToReturn => num_to_return,
    :flags => flags,
    :numberToSkip => number_to_skip
  }
end

.build_reply(documents, request_id, response_to, response_flags = 0, cursor_id = 0, starting_from = 0) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/mongo-proxy/wire.rb', line 199

def self.build_reply(documents, request_id, response_to,
    response_flags = 0, cursor_id = 0, starting_from = 0)
  documents = [documents] if documents.is_a?(Hash)

  return {
    :responseFlags => response_flags,
    :startingFrom => starting_from,
    :numberReturned => documents.size,
    :cursorID => cursor_id,
    :documents => documents,
    :header => {
      :requestID => request_id,
      :responseTo => response_to,
      :opCode => OP_REPLY
    }
  }
end

.build_update(request_id, database_name, collection_name, selector, update, flags = []) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/mongo-proxy/wire.rb', line 252

def self.build_update(request_id, database_name, collection_name, selector, update, flags = [])
  flag = 0
  flag = (flag | FLAG_UPDATE_UPSERT) if flags.include?(:upsert)
  flag = (flag | FLAG_UPDATE_MULTIUPDATE) if flags.include?(:multi)

  return {
    :header => {
      :opCode => OP_UPDATE,
      :requestID => request_id,
      :responseTo => 0
    },
    :database => database_name,
    :collection => collection_name,
    :selector => selector,
    :update => update,
    :flags => flag
  }
end

.hash(doc) ⇒ Object



498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
# File 'lib/mongo-proxy/wire.rb', line 498

def self.hash doc
  if doc[:header][:requestID]
    temp_req_id = doc[:header][:requestID]
    doc[:header][:requestID] = 1234
  end
  if doc[:header][:responseTo]
    temp_response_to = doc[:header][:responseTo]
    doc[:header][:responseTo] = 4321
  end

  if doc[:header][:opCode] == OP_GET_MORE
    key = build_full_collection(doc[:database], doc[:collection]) + doc[:cursorID].to_s
    @@hash_getmore_history[key] ||= 0
    doc[:header][:requestID] = @@hash_getmore_history[key]
    temp_cursor_id = doc[:cursorID]
    doc[:cursorID] = 0
    @@hash_getmore_history[key] += 1
  end

  x = Digest::SHA1.hexdigest(write doc)
  
  doc[:header][:requestID] = temp_req_id if temp_req_id
  doc[:header][:responseTo] = temp_response_to if temp_response_to
  doc[:cursorID] = temp_cursor_id if temp_cursor_id

  return x
end

.min(a, b) ⇒ Object



171
172
173
# File 'lib/mongo-proxy/wire.rb', line 171

def self.min(a, b)
  (a > b ? b : a)
end

.parse_full_collection(full_collection) ⇒ Object



175
176
177
178
# File 'lib/mongo-proxy/wire.rb', line 175

def self.parse_full_collection(full_collection)
  x = full_collection.split('.')
  return x[0], x[1..-1].join('.')
end

.receive(socket) ⇒ Object

Parse out an arbitrary binary mongo message, returning a hash representation for easy manipulation.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/mongo-proxy/wire.rb', line 38

def self.receive socket
  if socket.is_a?(String)
    socket = StringIO.new(socket)
    socket.set_encoding('UTF-8', 'UTF-8')
  end

  chunk1, x = receive_header(socket)
  return nil, nil unless x && chunk1

  parsed = {}

  chunk2 = socket.read(x[:messageLength] - HEADER_SIZE)

  case x[:opCode]
  when OP_REPLY
    parsed = receive_reply(chunk2)
  when OP_UPDATE
    parsed = receive_update(chunk2)
  when OP_INSERT
    parsed = receive_insert(chunk2)
  when OP_QUERY
    parsed = receive_query(chunk2)
  when OP_DELETE
    parsed = receive_delete(chunk2)
  when OP_GET_MORE
    parsed = receive_get_more(chunk2)
  when OP_KILL_CURSORS
    parsed = receive_kill_cursors(chunk2)
  else
    puts "could not parse message type :#{x[:opCode]}:"
  end

  parsed[:header] = x
  full = chunk1 + chunk2
  full = full.force_encoding('UTF-8')
  return full, parsed

rescue Exception => e
  @@log.warn "failed to read from socket #{socket.to_s}"
  return nil
end

.receive_bson(chunk, start, max = 10000) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/mongo-proxy/wire.rb', line 138

def self.receive_bson(chunk, start, max = 10000)
  docs = []

  while start < chunk.bytesize and docs.size < max
    bsonLength = chunk[start..(start + 4)].unpack('V')[0]
    doc = nil

    begin
      doc = BSON.deserialize(chunk[start..(start + bsonLength - 1)])
    rescue Exception => e
      puts 'could not deserialize BSON:'
      pp chunk[start..(start + bsonLength)]
      return nil, nil
    end

    docs << doc
    start += bsonLength
  end

  return docs, start
end

.receive_delete(chunk) ⇒ Object

OP_DELETE: 2006

header :header - Message header. int32 - Empty. string :database.:collection - Database + collection name. int32 :flags - Bit vector of delete-related flags. document selector - Selector for deletion



427
428
429
430
431
432
433
434
435
# File 'lib/mongo-proxy/wire.rb', line 427

def self.receive_delete(chunk)
  x = {}
  _, full, x[:flags] = chunk.unpack('VZ*V')
  docs, start = receive_bson(chunk, full.bytesize + 9, 1)
  x[:database], x[:collection] = parse_full_collection(full)
  x[:selector], _ = docs[0]

  return x
end

.receive_get_more(chunk) ⇒ Object

OP_QUERY: 2005

header :header - Message header. int32 - Empty. string :database.:collection - Database + collection name. int32 :numberToReturn - Limit for next results reply. int64 :cursorID - ID of cursor to consume more from.



388
389
390
391
392
393
394
# File 'lib/mongo-proxy/wire.rb', line 388

def self.receive_get_more(chunk)
  x = {}
  _, full, x[:numberToReturn], x[:cursorID] = chunk.unpack('VZ*VQ<')
  x[:database], x[:collection] = parse_full_collection(full)

  return x
end

.receive_header(stream) ⇒ Object

Receive the Mongo Wire message header from a stream.

int32 :messageLength - Length in bytes of subsequent message. int32 :requestID - Identifier of this message. int32 :responseTo - RequestID from the original request. int32 :opCode - Message type.



117
118
119
120
121
122
123
124
125
# File 'lib/mongo-proxy/wire.rb', line 117

def self.receive_header(stream)
  chunk = stream.read(HEADER_SIZE)
  return nil unless chunk != nil && chunk.bytesize == HEADER_SIZE

  x = {}
  x[:messageLength], x[:requestID], x[:responseTo], x[:opCode] = chunk.unpack('VVVV')
  x[:opCode] = OPS[x[:opCode]]
  return chunk, x
end

.receive_insert(chunk) ⇒ Object

OP_INSERT: 2002

header :header - Message header. int32 :flags - Bit vector flags. string :database.:collection - Database + collection name. document[] :documents - An array of BSON documents.



290
291
292
293
294
295
296
297
# File 'lib/mongo-proxy/wire.rb', line 290

def self.receive_insert(chunk)
  x = {}
  x[:flags], full = chunk.unpack('VZ*')
  x[:database], x[:collection] = parse_full_collection(full)
  x[:documents], _ = receive_bson(chunk, full.bytesize + 5)

  return x
end

.receive_kill_cursors(chunk) ⇒ Object

OP_KILL_CURSORS: 2007 Message to explicitly delete a cursor. Only sent from the client in very specific circumstances, cursors can also time out.

header :header - Message header. int32 - Empty. int32 :numberOfCursorIDs - Number of cursors to kill. int64[] :cursorIDs - Array of cursor ids to kill.



473
474
475
476
477
478
# File 'lib/mongo-proxy/wire.rb', line 473

def self.receive_kill_cursors(chunk)
  x = {}
  _, n = chunk.unpack('VV')
  x[:cursorIDs] = chunk[8..-1].unpack("Q<#{n}")
  return x
end

.receive_query(chunk) ⇒ Object

OP_QUERY: 2004

header :header - Message header. int32 :flags - A bit vector of query flags. string :database.:collection - Database + collection name. int32 :numberToSkip - Offset for results. int32 :numberToReturn - Limit for results. document :query - BSON document of query. document :returnFieldsSelector - Optional BSON document to select fields in response.



336
337
338
339
340
341
342
343
344
345
346
# File 'lib/mongo-proxy/wire.rb', line 336

def self.receive_query(chunk)
  x = {}
  x[:flags], full, x[:numberToSkip], x[:numberToReturn] = chunk.unpack('VZ*VV')
  start = 3 * 4 + full.bytesize + 1
  x[:database], x[:collection] = parse_full_collection(full)
  docs, start = receive_bson(chunk, start, 2)
  x[:query] = docs[0]
  x[:returnFieldSelector] = (docs.size > 1 ? docs[1] : nil)

  return x
end

.receive_reply(chunk) ⇒ Object

OP_REPLY: 1 A reply to a client request.

header :header - Message header. int32 :responseFlags - A bit vector of response flags. int64 :cursorID - ID of open cursor, if there is one. 0 otherwise. int32 :startingFrom - Offset in cursor of this reply message. int64 :numberReturned - Number of documents in the reply.



192
193
194
195
196
197
# File 'lib/mongo-proxy/wire.rb', line 192

def self.receive_reply(chunk)
  x = {}
  x[:responseFlags], x[:cursorID], x[:startingFrom], x[:numberReturned] = chunk.unpack('VQ<VV')
  x[:documents], _ = receive_bson(chunk, 20, x[:numberReturned])
  return x
end

.receive_update(chunk) ⇒ Object

OP_UPDATE: 2001 A MongoDB update query message.

header :header - Message header. int32 - An empty value. string :database.:collection - Database and collection name for update. int32 :flags - Bit vector of update flags. document :selector - BSON document representing update target. document :update - BSON document representing the update to perform.



240
241
242
243
244
245
246
247
248
249
250
# File 'lib/mongo-proxy/wire.rb', line 240

def self.receive_update(chunk)
  x = {}
  _, full, x[:flags] = chunk.unpack('VZ*V')
  x[:database], x[:collection] = parse_full_collection(full)
# TODO break out flags
  docs, _ = receive_bson(chunk, full.bytesize + 9, 2)
  x[:selector] = docs[0]
  x[:update] = docs[1]

  return x
end

.write(doc) ⇒ Object

Write a hash document representation into its corresponding binary form. This method can be used with documents in the format that receive returns, making it easy to parse a message, change it, and re-encode it.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/mongo-proxy/wire.rb', line 83

def self.write doc
  body = nil

  case doc[:header][:opCode]
  when OP_REPLY
    body = write_reply(doc)
  when OP_UPDATE
    body = write_update(doc)
  when OP_INSERT
    body = write_insert(doc)
  when OP_QUERY
    body = write_query(doc)
  when OP_DELETE
    body = write_delete(doc)
  when OP_GET_MORE
    body = write_get_more(doc)
  when OP_KILL_CURSORS
    body = write_kill_cursors(doc)
  else
    puts "could not write message type :#{doc[:header][:opCode]}:"
    return nil
  end

  body = body.force_encoding('UTF-8')

  return write_header(doc[:header], body)
end

.write_bson(docs) ⇒ Object



160
161
162
163
164
165
166
167
168
169
# File 'lib/mongo-proxy/wire.rb', line 160

def self.write_bson(docs)
  docs = [docs] if docs.is_a? Hash
  
  x = ''
  docs.each do |doc|
    x << BSON.serialize(doc).to_s
  end

  return x
end

.write_delete(doc) ⇒ Object



454
455
456
457
458
459
460
461
462
463
# File 'lib/mongo-proxy/wire.rb', line 454

def self.write_delete(doc)
  raise 'missing full collection name' unless doc[:database] && doc[:collection]
  raise 'missing selector' unless doc[:selector]
  flags = (doc[:flags] or 0)

  msg = [0, build_full_collection(doc[:database], doc[:collection]), flags].pack('VZ*V')
  msg << write_bson(doc[:selector])

  return msg
end

.write_get_more(doc) ⇒ Object



410
411
412
413
414
415
416
417
418
# File 'lib/mongo-proxy/wire.rb', line 410

def self.write_get_more(doc)
  raise 'missing full collection name' unless doc[:database] && doc[:collection]
  raise 'missing cursorID' unless doc[:cursorID]

  numberToReturn = (doc[:numberToReturn] or 0)
  msg = [0, build_full_collection(doc[:database], doc[:collection]), numberToReturn, doc[:cursorID]].pack('VZ*VQ<')

  return msg
end

.write_header(doc, body) ⇒ Object



127
128
129
130
131
132
133
134
135
136
# File 'lib/mongo-proxy/wire.rb', line 127

def self.write_header doc, body
  raise 'no requestID' unless doc[:requestID]
  raise 'no opCode' unless doc[:opCode]
  response_to = (doc[:responseTo] or 0)
  length = body.bytesize + HEADER_SIZE

  header = [length, doc[:requestID], response_to, OPS_INVERTED[doc[:opCode]]].pack('VVVV')
  header = header.force_encoding('UTF-8')
  return header + body
end

.write_insert(doc) ⇒ Object



314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/mongo-proxy/wire.rb', line 314

def self.write_insert(doc)
  raise 'missing full collection' unless doc[:database] && doc[:collection]
  raise 'missing documents' unless doc[:documents]
  flags = (doc[:flags] or 0)
  docs = doc[:documents]
  docs = [docs] if docs.is_a? Hash

  msg = [flags, build_full_collection(doc[:database], doc[:collection])].pack('VZ*')
  msg << write_bson(docs)

  return msg
end

.write_kill_cursors(doc) ⇒ Object



491
492
493
494
# File 'lib/mongo-proxy/wire.rb', line 491

def self.write_kill_cursors(doc)
  raise 'missing cursorIDs' unless doc[:cursorIDs]
  return ([0, doc[:cursorIDs].size] + doc[:cursorIDs]).pack("VVQ<*")
end

.write_query(doc) ⇒ Object



366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/mongo-proxy/wire.rb', line 366

def self.write_query(doc)
  raise 'missing full collection name' unless doc[:database] && doc[:collection]
  flags = (doc[:flags] or 0)
  numberToSkip = (doc[:numberToSkip] or 0)
  numberToReturn = (doc[:numberToReturn] or 4294967295)
  query = (doc[:query] or {})
  returnFieldSelector = (doc[:returnFieldSelector] or nil)

  msg = [flags, build_full_collection(doc[:database], doc[:collection]), numberToSkip, numberToReturn].pack('VZ*VV')
  msg << write_bson([query])
  msg << write_bson([returnFieldSelector]) if returnFieldSelector

  return msg
end

.write_reply(doc) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/mongo-proxy/wire.rb', line 217

def self.write_reply(doc)
  raise 'no responseTo' unless doc[:header][:responseTo]
  raise 'no documents' unless doc[:documents]
  responseFlags = (doc[:responseFlags] || 0)
  cursorId = (doc[:cursorID] || 0)
  startingFrom = (doc[:startingFrom] || 0)
  numberReturned = doc[:numberReturned]

  msg = [responseFlags, cursorId, startingFrom, numberReturned].pack('VQ<VV')
  msg << write_bson(doc[:documents])

  return msg
end

.write_update(doc) ⇒ Object



271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/mongo-proxy/wire.rb', line 271

def self.write_update doc
  raise 'missing collection info' unless doc[:database] && doc[:collection]
  raise 'missing selector' unless doc[:selector]
  raise 'missing update' unless doc[:update]
  flags = (doc[:flags] or 0)

  msg = [0, build_full_collection(doc[:database], doc[:collection]), flags].pack('VZ*V')
  msg << write_bson(doc[:selector])
  msg << write_bson(doc[:update])

  return msg
end