Class: Mongo::Protocol::Msg Private

Inherits:
Message
  • Object
show all
Includes:
Monitoring::Event::Secure
Defined in:
lib/mongo/protocol/msg.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire protocol opcode.

OP_MSG is only available in MongoDB 3.6 (maxWireVersion >= 6) and later.

Since:

  • 2.5.0

Defined Under Namespace

Classes: Section1

Constant Summary collapse

DATABASE_IDENTIFIER =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

The identifier for the database name to execute the command on.

Since:

  • 2.5.0

'$db'.freeze
INTERNAL_KEYS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Keys that the driver adds to commands. These are going to be moved to the end of the hash for better logging.

Since:

  • 2.5.0

Set.new(%w($clusterTime $db lsid signature txnNumber)).freeze

Constants included from Monitoring::Event::Secure

Monitoring::Event::Secure::REDACTED_COMMANDS

Constants inherited from Message

Mongo::Protocol::Message::BATCH_SIZE, Mongo::Protocol::Message::COLLECTION, Mongo::Protocol::Message::LIMIT, Mongo::Protocol::Message::MAX_MESSAGE_SIZE, Mongo::Protocol::Message::ORDERED, Mongo::Protocol::Message::Q

Instance Attribute Summary

Attributes inherited from Message

#request_id

Instance Method Summary collapse

Methods included from Monitoring::Event::Secure

#compression_allowed?, #redacted, #sensitive?

Methods inherited from Message

#==, deserialize, #hash, #maybe_inflate, #set_request_id

Methods included from Id

included

Constructor Details

#initialize(flags, options, main_document, *sequences) ⇒ Msg

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates a new OP_MSG protocol message

Examples:

Create a OP_MSG wire protocol message

Msg.new([:more_to_come], {}, { hello: 1 },
        { type: 1, payload: { identifier: 'documents', sequence: [..] } })

Parameters:

  • flags (Array<Symbol>)

    The flag bits. Currently supported values are :more_to_come and :checksum_present.

  • options (Hash)

    The options.

  • main_document (BSON::Document, Hash)

    The document that will become the payload type 0 section. Can contain global args as they are defined in the OP_MSG specification.

  • sequences (Protocol::Msg::Section1)

    Zero or more payload type 1 sections.

Options Hash (options):

  • validating_keys (true, false)

    Whether keys should be validated for being valid document keys (i.e. not begin with $ and not contain dots). This option is deprecated and will not be used. It will removed in version 3.0.

Since:

  • 2.5.0



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/mongo/protocol/msg.rb', line 66

def initialize(flags, options, main_document, *sequences)
  if flags
    flags.each do |flag|
      unless KNOWN_FLAGS.key?(flag)
        raise ArgumentError, "Unknown flag: #{flag.inspect}"
      end
    end
  end
  @flags = flags || []
  @options = options
  unless main_document.is_a?(Hash)
    raise ArgumentError, "Main document must be a Hash, given: #{main_document.class}"
  end
  @main_document = main_document
  sequences.each_with_index do |section, index|
    unless section.is_a?(Section1)
      raise ArgumentError, "All sequences must be Section1 instances, got: #{section} at index #{index}"
    end
  end
  @sequences = sequences
  @sections = [
    {type: 0, payload: @main_document}
  ] + @sequences.map do |section|
    {type: 1, payload: {
      identifier: section.identifier,
      sequence: section.documents.map do |doc|
        CachingHash.new(doc)
      end,
    }}
  end
  @request_id = nil
  super
end

Instance Method Details

#bulk_write?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

This method was written to support client-side encryption functionality. It is not recommended that this method be used in service of any other feature or behavior.

Whether this message represents a bulk write. A bulk write is an insert, update, or delete operation that encompasses multiple operations of the same type.

Returns:

  • (Boolean)

    Whether this message represents a bulk write.

Since:

  • 2.5.0



272
273
274
275
276
277
278
279
280
281
282
# File 'lib/mongo/protocol/msg.rb', line 272

def bulk_write?
  inserts = @main_document['documents']
  updates = @main_document['updates']
  deletes = @main_document['deletes']

  num_inserts = inserts && inserts.length || 0
  num_updates = updates && updates.length || 0
  num_deletes = deletes && deletes.length || 0

  num_inserts > 1  || num_updates > 1 || num_deletes > 1
end

#documentsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.5.0



196
197
198
# File 'lib/mongo/protocol/msg.rb', line 196

def documents
  [@main_document]
end

#fix_after_deserializationObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents.

TODO fix deserialization so that this method is not needed.

Since:

  • 2.5.0



184
185
186
187
188
189
190
191
192
193
194
# File 'lib/mongo/protocol/msg.rb', line 184

def fix_after_deserialization
  if @sections.nil?
    raise NotImplementedError, "After deserializations @sections should have been initialized"
  end
  if @sections.length != 1
    raise NotImplementedError, "Deserialization must have produced exactly one section, but it produced #{sections.length} sections"
  end
  @main_document = @sections.first
  @sequences = []
  @sections = [{type: 0, payload: @main_document}]
end

#maybe_add_server_api(server_api) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.5.0



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/mongo/protocol/msg.rb', line 284

def maybe_add_server_api(server_api)
  conflicts = {}
  %i(apiVersion apiStrict apiDeprecationErrors).each do |key|
    if @main_document.key?(key)
      conflicts[key] = @main_document[key]
    end
    if @main_document.key?(key.to_s)
      conflicts[key] = @main_document[key.to_s]
    end
  end
  unless conflicts.empty?
    raise Error::ServerApiConflict, "The Client is configured with :server_api option but the operation provided the following conflicting parameters: #{conflicts.inspect}"
  end

  main_document = @main_document.merge(
    Utils.transform_server_api(server_api)
  )
  Msg.new(@flags, @options, main_document, *@sequences)
end

#maybe_compress(compressor, zlib_compression_level = nil) ⇒ Message

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compress the message, if the command being sent permits compression. Otherwise returns self.

Parameters:

  • compressor (String, Symbol)

    The compressor to use.

  • zlib_compression_level (Integer) (defaults to: nil)

    The zlib compression level to use.

Returns:

  • (Message)

    A Protocol::Compressed message or self, depending on whether this message can be compressed.

Since:

  • 2.5.0



174
175
176
# File 'lib/mongo/protocol/msg.rb', line 174

def maybe_compress(compressor, zlib_compression_level = nil)
  compress_if_possible(command.keys.first, compressor, zlib_compression_level)
end

#maybe_decrypt(context) ⇒ Mongo::Protocol::Msg

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Possibly decrypt this message with libmongocrypt. Message will only be decrypted if the specified client exists, that client has been given auto-encryption options, and this message is eligible for decryption. A message is eligible for decryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

Parameters:

Returns:

  • (Mongo::Protocol::Msg)

    The decrypted message, or the original message if decryption was not possible or necessary.

Since:

  • 2.5.0



251
252
253
254
255
256
257
258
259
# File 'lib/mongo/protocol/msg.rb', line 251

def maybe_decrypt(context)
  if context.decrypt?
    cmd = merge_sections
    enc_cmd = context.encrypter.decrypt(cmd)
    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end

#maybe_encrypt(connection, context) ⇒ Mongo::Protocol::Msg

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Possibly encrypt this message with libmongocrypt. Message will only be encrypted if the specified client exists, that client has been given auto-encryption options, the client has not been instructed to bypass auto-encryption, and mongocryptd determines that this message is eligible for encryption. A message is eligible for encryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

Parameters:

Returns:

  • (Mongo::Protocol::Msg)

    The encrypted message, or the original message if encryption was not possible or necessary.

Since:

  • 2.5.0



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/mongo/protocol/msg.rb', line 214

def maybe_encrypt(connection, context)
  # TODO verify compression happens later, i.e. when this method runs
  # the message is not compressed.
  if context.encrypt?
    if connection.description.max_wire_version < 8
      raise Error::CryptError.new(
        "Cannot perform encryption against a MongoDB server older than " +
        "4.2 (wire version less than 8). Currently connected to server " +
        "with max wire version #{connection.description.max_wire_version}} " +
        "(Auto-encryption requires a minimum MongoDB version of 4.2)"
      )
    end

    db_name = @main_document[DATABASE_IDENTIFIER]
    cmd = merge_sections
    enc_cmd = context.encrypter.encrypt(db_name, cmd)
    if cmd.key?('$db') && !enc_cmd.key?('$db')
      enc_cmd['$db'] = cmd['$db']
    end

    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end

#number_returnedInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the number of documents returned from the server.

The Msg instance must be for a server reply and the reply must return an active cursor (either a newly created one or one whose iteration is continuing via getMore).

Returns:

  • (Integer)

    Number of returned documents.

Raises:

  • (NotImplementedError)

Since:

  • 2.5.0



311
312
313
314
315
316
317
318
319
320
# File 'lib/mongo/protocol/msg.rb', line 311

def number_returned
  if doc = documents.first
    if cursor = doc['cursor']
      if batch = cursor['firstBatch'] || cursor['nextBatch']
        return batch.length
      end
    end
  end
  raise NotImplementedError, "number_returned is only defined for cursor replies"
end

#payloadBSON::Document

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the event payload for monitoring.

Examples:

Return the event payload.

message.payload

Returns:

  • (BSON::Document)

    The event payload.

Since:

  • 2.5.0



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/mongo/protocol/msg.rb', line 120

def payload
  # Reorder keys in main_document for better logging - see
  # https://jira.mongodb.org/browse/RUBY-1591.
  # Note that even without the reordering, the payload is not an exact
  # match to what is sent over the wire because the command as used in
  # the published event combines keys from multiple sections of the
  # payload sent over the wire.
  ordered_command = {}
  skipped_command = {}
  command.each do |k, v|
    if INTERNAL_KEYS.member?(k.to_s)
      skipped_command[k] = v
    else
      ordered_command[k] = v
    end
  end
  ordered_command.update(skipped_command)

  BSON::Document.new(
    command_name: ordered_command.keys.first.to_s,
    database_name: @main_document[DATABASE_IDENTIFIER],
    command: ordered_command,
    request_id: request_id,
    reply: @main_document,
  )
end

#replyable?true, false

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether the message expects a reply from the database.

Examples:

Does the message require a reply?

message.replyable?

Returns:

  • (true, false)

    If the message expects a reply.

Since:

  • 2.5.0



108
109
110
# File 'lib/mongo/protocol/msg.rb', line 108

def replyable?
  @replyable ||= !flags.include?(:more_to_come)
end

#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ BSON::ByteBuffer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Serializes message into bytes that can be sent on the wire.

Parameters:

  • buffer (BSON::ByteBuffer) (defaults to: BSON::ByteBuffer.new)

    where the message should be inserted.

  • max_bson_size (Integer) (defaults to: nil)

    The maximum bson object size.

Returns:

  • (BSON::ByteBuffer)

    buffer containing the serialized message.

Since:

  • 2.5.0



155
156
157
158
159
160
161
# File 'lib/mongo/protocol/msg.rb', line 155

def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
  validate_document_size!(max_bson_size)

  super
  add_check_sum(buffer)
  buffer
end