Class: Mongo::Session
- Inherits:
-
Object
- Object
- Mongo::Session
- Extended by:
- Forwardable
- Includes:
- Retryable
- Defined in:
- lib/mongo/session.rb,
lib/mongo/session/session_pool.rb,
lib/mongo/session/server_session.rb
Overview
A logical session representing a set of sequential operations executed
by an application that are in some way.
Defined Under Namespace
Classes: ServerSession, SessionPool
Constant Summary collapse
- MISMATCHED_CLUSTER_ERROR_MSG =
Error message indicating that the session was retrieved from a client with a different cluster than that of the client through which it is currently being used.
'The configuration of the client used to create this session does not match that ' + 'of the client owning this operation. Please only use this session for operations through its parent ' + 'client.'.freeze
- SESSION_ENDED_ERROR_MSG =
Error message describing that the session cannot be used because it has already been ended.
'This session has ended and cannot be used. Please create a new one.'.freeze
- SESSIONS_NOT_SUPPORTED =
Error message describing that sessions are not supported by the server version.
'Sessions are not supported by the connected servers.'.freeze
- NO_TRANSACTION_STATE =
The state of a session in which the last operation was not related to any transaction or no operations have yet occurred.
:no_transaction- STARTING_TRANSACTION_STATE =
The state of a session in which a user has initiated a transaction but no operations within the transactions have occurred yet.
:starting_transaction- TRANSACTION_IN_PROGRESS_STATE =
The state of a session in which a transaction has been started and at least one operation has occurred, but the transaction has not yet been committed or aborted.
:transaction_in_progress- TRANSACTION_COMMITTED_STATE =
The state of a session in which the last operation executed was a transaction commit.
:transaction_committed- TRANSACTION_ABORTED_STATE =
The state of a session in which the last operation executed was a transaction abort.
:transaction_aborted- UNLABELED_WRITE_CONCERN_CODES =
[ 79, # UnknownReplWriteConcern 100, # CannotSatisfyWriteConcern, ].freeze
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Get the cluster through which this session was created.
-
#cluster_time ⇒ Object
readonly
The cluster time for this session.
-
#operation_time ⇒ Object
readonly
The latest seen operation time for this session.
-
#options ⇒ Object
readonly
Get the options for this session.
-
#txn_options ⇒ Object
readonly
The options for the transaction currently being executed on the session.
Instance Method Summary collapse
-
#abort_transaction ⇒ Object
Abort the currently active transaction without making any changes to the database.
-
#add_autocommit!(command) ⇒ Hash, BSON::Document
Add the autocommit field to a command document if applicable.
-
#add_id!(command) ⇒ Hash, BSON::Document
Add this session’s id to a command document.
-
#add_start_transaction!(command) ⇒ Hash, BSON::Document
Add the startTransaction field to a command document if applicable.
-
#add_txn_num!(command) ⇒ Hash, BSON::Document
Add the transaction number to a command document if applicable.
-
#add_txn_opts!(command, read) ⇒ Hash, BSON::Document
Add the transactions options if applicable.
-
#advance_cluster_time(new_cluster_time) ⇒ BSON::Document, Hash
Advance the cached cluster time document for this session.
-
#advance_operation_time(new_operation_time) ⇒ BSON::Timestamp
Advance the cached operation time for this session.
- #cluster ⇒ Object
-
#commit_transaction ⇒ Object
Commit the currently active transaction on the session.
-
#end_session ⇒ nil
End this session.
-
#ended? ⇒ true, false
Whether this session has ended.
-
#explicit? ⇒ true, false
Is this session an explicit one (i.e. user-created).
-
#implicit? ⇒ true, false
Is this session an implicit one (not user-created).
-
#in_transaction? ⇒ true | false
Whether or not the session is currently in a transaction.
-
#initialize(server_session, client, options = {}) ⇒ Session
constructor
Initialize a Session.
-
#inspect ⇒ String
Get a formatted string for use in inspection.
-
#next_txn_num ⇒ Integer
Increment and return the next transaction number.
-
#process(result) ⇒ Operation::Result
Process a response from the server that used this session.
-
#retry_writes? ⇒ true, false
Will writes executed with this session be retried.
-
#session_id ⇒ BSON::Document
Get the session id.
-
#start_transaction(options = nil) ⇒ Object
Start a new transaction.
-
#suppress_read_write_concern!(command) ⇒ Hash, BSON::Document
Remove the read concern and/or write concern from the command if not applicable.
-
#txn_num ⇒ Integer
Get the current transaction number.
-
#txn_read_preference ⇒ Hash
Get the read preference the session will use in the currently active transaction.
-
#update_state! ⇒ Object
Update the state of the session due to a (non-commit and non-abort) operation being run.
-
#validate!(cluster) ⇒ nil
Validate the session.
-
#validate_read_preference!(command) ⇒ Object
Ensure that the read preference of a command primary.
Methods included from Retryable
#read_with_one_retry, #read_with_retry, #write_with_retry
Constructor Details
#initialize(server_session, client, options = {}) ⇒ Session
Initialize a Session.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/mongo/session.rb', line 120 def initialize(server_session, client, = {}) @server_session = server_session = .dup # Because the read preference will need to be inserted into a command as a string, we convert # it from a symbol immediately upon receiving it. if [:read_preference] && [:read_preference][:mode] [:read_preference][:mode] = [:read_preference][:mode].to_s end @client = client.use(:admin) = .freeze @cluster_time = nil @state = NO_TRANSACTION_STATE end |
Instance Attribute Details
#client ⇒ Object (readonly)
Get the cluster through which this session was created.
36 37 38 |
# File 'lib/mongo/session.rb', line 36 def client @client end |
#cluster_time ⇒ Object (readonly)
The cluster time for this session.
41 42 43 |
# File 'lib/mongo/session.rb', line 41 def cluster_time @cluster_time end |
#operation_time ⇒ Object (readonly)
The latest seen operation time for this session.
46 47 48 |
# File 'lib/mongo/session.rb', line 46 def operation_time @operation_time end |
#options ⇒ Object (readonly)
Get the options for this session.
31 32 33 |
# File 'lib/mongo/session.rb', line 31 def end |
#txn_options ⇒ Object (readonly)
The options for the transaction currently being executed on the session.
51 52 53 |
# File 'lib/mongo/session.rb', line 51 def end |
Instance Method Details
#abort_transaction ⇒ Object
Abort the currently active transaction without making any changes to the database.
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 |
# File 'lib/mongo/session.rb', line 578 def abort_transaction check_if_ended! check_if_no_transaction! if within_states?(TRANSACTION_COMMITTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_after_msg( :commitTransaction, :abortTransaction)) end if within_states?(TRANSACTION_ABORTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_twice_msg(:abortTransaction)) end begin unless starting_transaction? write_with_retry(self, [:write_concern], true) do |server, txn_num| Operation::Command.new( selector: { abortTransaction: 1 }, db_name: 'admin', session: self, txn_num: txn_num ).execute(server) end end @state = TRANSACTION_ABORTED_STATE rescue Mongo::Error::InvalidTransactionOperation raise rescue Mongo::Error @state = TRANSACTION_ABORTED_STATE end end |
#add_autocommit!(command) ⇒ Hash, BSON::Document
Add the autocommit field to a command document if applicable.
185 186 187 188 189 |
# File 'lib/mongo/session.rb', line 185 def add_autocommit!(command) command.tap do |c| c[:autocommit] = false if in_transaction? end end |
#add_id!(command) ⇒ Hash, BSON::Document
Add this session’s id to a command document.
199 200 201 |
# File 'lib/mongo/session.rb', line 199 def add_id!(command) command.merge!(lsid: session_id) end |
#add_start_transaction!(command) ⇒ Hash, BSON::Document
Add the startTransaction field to a command document if applicable.
211 212 213 214 215 |
# File 'lib/mongo/session.rb', line 211 def add_start_transaction!(command) command.tap do |c| c[:startTransaction] = true if starting_transaction? end end |
#add_txn_num!(command) ⇒ Hash, BSON::Document
Add the transaction number to a command document if applicable.
225 226 227 228 229 |
# File 'lib/mongo/session.rb', line 225 def add_txn_num!(command) command.tap do |c| c[:txnNumber] = BSON::Int64.new(@server_session.txn_num) if in_transaction? end end |
#add_txn_opts!(command, read) ⇒ Hash, BSON::Document
Add the transactions options if applicable.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/mongo/session.rb', line 239 def add_txn_opts!(command, read) command.tap do |c| # The read preference should be added for all read operations. if read && txn_read_pref = txn_read_preference Mongo::Lint.validate_underscore_read_preference(txn_read_pref) txn_read_pref = txn_read_pref.dup txn_read_pref[:mode] = txn_read_pref[:mode].to_s.gsub(/(_\w)/) { |match| match[1].upcase } Mongo::Lint.validate_camel_case_read_preference(txn_read_pref) c['$readPreference'] = txn_read_pref end # The read concern should be added to any command that starts a transaction. if starting_transaction? && txn_read_concern c[:readConcern] ||= {} c[:readConcern].merge!(txn_read_concern) end # We need to send the read concern level as a string rather than a symbol. if c[:readConcern] && c[:readConcern][:level] c[:readConcern][:level] = c[:readConcern][:level].to_s end # The write concern should be added to any abortTransaction or commitTransaction command. if (c[:abortTransaction] || c[:commitTransaction]) && txn_write_concern c[:writeConcern] = txn_write_concern end # A non-numeric write concern w value needs to be sent as a string rather than a symbol. if c[:writeConcern] && c[:writeConcern][:w] && c[:writeConcern][:w].is_a?(Symbol) c[:writeConcern][:w] = c[:writeConcern][:w].to_s end end end |
#advance_cluster_time(new_cluster_time) ⇒ BSON::Document, Hash
Advance the cached cluster time document for this session.
365 366 367 368 369 370 371 |
# File 'lib/mongo/session.rb', line 365 def advance_cluster_time(new_cluster_time) if @cluster_time @cluster_time = [ @cluster_time, new_cluster_time ].max_by { |doc| doc[Cluster::CLUSTER_TIME] } else @cluster_time = new_cluster_time end end |
#advance_operation_time(new_operation_time) ⇒ BSON::Timestamp
Advance the cached operation time for this session.
383 384 385 386 387 388 389 |
# File 'lib/mongo/session.rb', line 383 def advance_operation_time(new_operation_time) if @operation_time @operation_time = [ @operation_time, new_operation_time ].max else @operation_time = new_operation_time end end |
#cluster ⇒ Object
643 644 645 |
# File 'lib/mongo/session.rb', line 643 def cluster @client.cluster end |
#commit_transaction ⇒ Object
Commit the currently active transaction on the session.
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 |
# File 'lib/mongo/session.rb', line 521 def commit_transaction check_if_ended! check_if_no_transaction! if within_states?(TRANSACTION_ABORTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_after_msg( :abortTransaction, :commitTransaction)) end begin # If commitTransaction is called twice, we need to run the same commit operation again, so # we revert the session to the previous state. if within_states?(TRANSACTION_COMMITTED_STATE) @state = @last_commit_skipped ? STARTING_TRANSACTION_STATE : TRANSACTION_IN_PROGRESS_STATE end if starting_transaction? @last_commit_skipped = true else @last_commit_skipped = false write_with_retry(self, [:write_concern], true) do |server, txn_num| Operation::Command.new( selector: { commitTransaction: 1 }, db_name: 'admin', session: self, txn_num: txn_num ).execute(server) end end rescue Mongo::Error::NoServerAvailable, Mongo::Error::SocketError => e e.send(:add_label, Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) raise e rescue Mongo::Error::OperationFailure => e err_doc = e.instance_variable_get(:@result).send(:first_document) if e.write_retryable? || (err_doc['writeConcernError'] && !UNLABELED_WRITE_CONCERN_CODES.include?(err_doc['writeConcernError']['code'])) e.send(:add_label, Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) end raise e ensure @state = TRANSACTION_COMMITTED_STATE end end |
#end_session ⇒ nil
End this session.
156 157 158 159 160 161 162 163 |
# File 'lib/mongo/session.rb', line 156 def end_session if !ended? && @client abort_transaction if within_states?(TRANSACTION_IN_PROGRESS_STATE) rescue Mongo::Error @client.cluster.session_pool.checkin(@server_session) end ensure @server_session = nil end |
#ended? ⇒ true, false
Whether this session has ended.
173 174 175 |
# File 'lib/mongo/session.rb', line 173 def ended? @server_session.nil? end |
#explicit? ⇒ true, false
Is this session an explicit one (i.e. user-created).
462 463 464 |
# File 'lib/mongo/session.rb', line 462 def explicit? @explicit ||= !implicit? end |
#implicit? ⇒ true, false
Is this session an implicit one (not user-created).
450 451 452 |
# File 'lib/mongo/session.rb', line 450 def implicit? @implicit ||= !!(.key?(:implicit) && [:implicit] == true) end |
#in_transaction? ⇒ true | false
Whether or not the session is currently in a transaction.
621 622 623 |
# File 'lib/mongo/session.rb', line 621 def in_transaction? within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) end |
#inspect ⇒ String
Get a formatted string for use in inspection.
144 145 146 |
# File 'lib/mongo/session.rb', line 144 def inspect "#<Mongo::Session:0x#{object_id} session_id=#{session_id} options=#{@options}>" end |
#next_txn_num ⇒ Integer
Increment and return the next transaction number.
426 427 428 |
# File 'lib/mongo/session.rb', line 426 def next_txn_num @server_session.next_txn_num if @server_session end |
#process(result) ⇒ Operation::Result
Process a response from the server that used this session.
346 347 348 349 350 351 352 353 |
# File 'lib/mongo/session.rb', line 346 def process(result) unless implicit? set_operation_time(result) set_cluster_time(result) end @server_session.set_last_use! result end |
#retry_writes? ⇒ true, false
Retryable writes are only available on server versions at least 3.6 and with sharded clusters or replica sets.
Will writes executed with this session be retried.
402 403 404 |
# File 'lib/mongo/session.rb', line 402 def retry_writes? !!cluster.[:retry_writes] && (cluster.replica_set? || cluster.sharded?) end |
#session_id ⇒ BSON::Document
Get the session id.
414 415 416 |
# File 'lib/mongo/session.rb', line 414 def session_id @server_session.session_id if @server_session end |
#start_transaction(options = nil) ⇒ Object
Start a new transaction.
Note that the transaction will not be started on the server until an operation is performed after start_transaction is called.
progress or if the write concern is unacknowledged.
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/mongo/session.rb', line 493 def start_transaction( = nil) check_if_ended! if within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::TRANSACTION_ALREADY_IN_PROGRESS) end next_txn_num = || [:default_transaction_options] || {} if txn_write_concern && WriteConcern.send(:unacknowledged?, txn_write_concern) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::UNACKNOWLEDGED_WRITE_CONCERN) end @state = STARTING_TRANSACTION_STATE end |
#suppress_read_write_concern!(command) ⇒ Hash, BSON::Document
Remove the read concern and/or write concern from the command if not applicable.
281 282 283 284 285 286 287 288 |
# File 'lib/mongo/session.rb', line 281 def suppress_read_write_concern!(command) command.tap do |c| next unless in_transaction? c.delete(:readConcern) unless starting_transaction? c.delete(:writeConcern) unless c[:commitTransaction] || c[:abortTransaction] end end |
#txn_num ⇒ Integer
Get the current transaction number.
438 439 440 |
# File 'lib/mongo/session.rb', line 438 def txn_num @server_session && @server_session.txn_num end |
#txn_read_preference ⇒ Hash
Get the read preference the session will use in the currently active transaction.
This is a driver style hash with underscore keys.
636 637 638 639 640 641 |
# File 'lib/mongo/session.rb', line 636 def txn_read_preference rp = && [:read_preference] || @client.read_preference Mongo::Lint.validate_underscore_read_preference(rp) rp end |
#update_state! ⇒ Object
Update the state of the session due to a (non-commit and non-abort) operation being run.
309 310 311 312 313 314 315 316 |
# File 'lib/mongo/session.rb', line 309 def update_state! case @state when STARTING_TRANSACTION_STATE @state = TRANSACTION_IN_PROGRESS_STATE when TRANSACTION_COMMITTED_STATE, TRANSACTION_ABORTED_STATE @state = NO_TRANSACTION_STATE end end |
#validate!(cluster) ⇒ nil
Validate the session.
330 331 332 333 334 |
# File 'lib/mongo/session.rb', line 330 def validate!(cluster) check_matching_cluster!(cluster) check_if_ended! self end |
#validate_read_preference!(command) ⇒ Object
Ensure that the read preference of a command primary.
not primary.
299 300 301 302 303 304 |
# File 'lib/mongo/session.rb', line 299 def validate_read_preference!(command) return unless in_transaction? && non_primary_read_preference_mode?(command) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::INVALID_READ_PREFERENCE) end |