Class: Mongo::Session
- Inherits:
-
Object
- Object
- Mongo::Session
- Extended by:
- Forwardable
- Includes:
- Retryable
- Defined in:
- lib/mongo/session.rb,
lib/mongo/session/session_pool.rb,
lib/mongo/session/server_session.rb
Overview
A logical session representing a set of sequential operations executed by an application that are related in some way.
Defined Under Namespace
Classes: ServerSession, SessionPool
Constant Summary collapse
- MISMATCHED_CLUSTER_ERROR_MSG =
Error message indicating that the session was retrieved from a client with a different cluster than that of the client through which it is currently being used.
'The configuration of the client used to create this session does not match that ' + 'of the client owning this operation. Please only use this session for operations through its parent ' + 'client.'.freeze
- SESSION_ENDED_ERROR_MSG =
Error message describing that the session cannot be used because it has already been ended.
'This session has ended and cannot be used. Please create a new one.'.freeze
- SESSIONS_NOT_SUPPORTED =
Error message describing that sessions are not supported by the server version.
'Sessions are not supported by the connected servers.'.freeze
- NO_TRANSACTION_STATE =
The state of a session in which the last operation was not related to any transaction or no operations have yet occurred.
:no_transaction
- STARTING_TRANSACTION_STATE =
The state of a session in which a user has initiated a transaction but no operations within the transactions have occurred yet.
:starting_transaction
- TRANSACTION_IN_PROGRESS_STATE =
The state of a session in which a transaction has been started and at least one operation has occurred, but the transaction has not yet been committed or aborted.
:transaction_in_progress
- TRANSACTION_COMMITTED_STATE =
The state of a session in which the last operation executed was a transaction commit.
:transaction_committed
- TRANSACTION_ABORTED_STATE =
The state of a session in which the last operation executed was a transaction abort.
:transaction_aborted
- UNLABELED_WRITE_CONCERN_CODES =
[ 79, # UnknownReplWriteConcern 100, # CannotSatisfyWriteConcern, ].freeze
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Get the client through which this session was created.
-
#cluster_time ⇒ Object
readonly
The cluster time for this session.
-
#operation_time ⇒ Object
readonly
The latest seen operation time for this session.
-
#options ⇒ Object
readonly
Get the options for this session.
-
#txn_options ⇒ Object
readonly
The options for the transaction currently being executed on the session.
Instance Method Summary collapse
-
#abort_transaction ⇒ Object
Abort the currently active transaction without making any changes to the database.
-
#add_autocommit!(command) ⇒ Hash, BSON::Document
private
Add the autocommit field to a command document if applicable.
-
#add_id!(command) ⇒ Hash, BSON::Document
private
Add this session’s id to a command document.
-
#add_start_transaction!(command) ⇒ Hash, BSON::Document
private
Add the startTransaction field to a command document if applicable.
-
#add_txn_num!(command) ⇒ Hash, BSON::Document
private
Add the transaction number to a command document if applicable.
-
#add_txn_opts!(command, read) ⇒ Hash, BSON::Document
private
Add the transactions options if applicable.
-
#advance_cluster_time(new_cluster_time) ⇒ BSON::Document, Hash
Advance the cached cluster time document for this session.
-
#advance_operation_time(new_operation_time) ⇒ BSON::Timestamp
Advance the cached operation time for this session.
- #cluster ⇒ Object
-
#commit_transaction(options = nil) ⇒ Object
Commit the currently active transaction on the session.
-
#end_session ⇒ nil
End this session.
-
#ended? ⇒ true, false
Whether this session has ended.
-
#explicit? ⇒ true, false
Is this session an explicit one (i.e. user-created).
-
#implicit? ⇒ true, false
Is this session an implicit one (not user-created).
-
#in_transaction? ⇒ true | false
Whether or not the session is currently in a transaction.
-
#initialize(server_session, client, options = {}) ⇒ Session
constructor
private
Initialize a Session.
-
#inspect ⇒ String
Get a formatted string for use in inspection.
-
#next_txn_num ⇒ Integer
private
Increment and return the next transaction number.
-
#process(result) ⇒ Operation::Result
private
Process a response from the server that used this session.
-
#retry_writes? ⇒ true, false
Will writes executed with this session be retried.
-
#session_id ⇒ BSON::Document
Get the server session id of this session, if the session was not ended.
-
#start_transaction(options = nil) ⇒ Object
Places subsequent operations in this session into a new transaction.
-
#suppress_read_write_concern!(command) ⇒ Hash, BSON::Document
private
Remove the read concern and/or write concern from the command if not applicable.
-
#txn_num ⇒ Integer
Get the current transaction number.
-
#txn_read_preference ⇒ Hash
Get the read preference the session will use in the currently active transaction.
-
#update_state! ⇒ Object
private
Update the state of the session due to a (non-commit and non-abort) operation being run.
-
#validate!(cluster) ⇒ nil
private
Validate the session.
-
#validate_read_preference!(command) ⇒ Object
private
Ensure that the read preference of a command primary.
-
#with_transaction(options = nil) ⇒ Object
Executes the provided block in a transaction, retrying as necessary.
Methods included from Retryable
#read_with_one_retry, #read_with_retry, #write_with_retry
Constructor Details
#initialize(server_session, client, options = {}) ⇒ Session
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Applications should use Client#start_session to begin a session.
Initialize a Session.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/mongo/session.rb', line 124 def initialize(server_session, client, = {}) @server_session = server_session = .dup # Because the read preference will need to be inserted into a command as a string, we convert # it from a symbol immediately upon receiving it. if [:read_preference] && [:read_preference][:mode] [:read_preference][:mode] = [:read_preference][:mode].to_s end @client = client.use(:admin) @options = .freeze @cluster_time = nil @state = NO_TRANSACTION_STATE end |
Instance Attribute Details
#client ⇒ Object (readonly)
Get the client through which this session was created.
36 37 38 |
# File 'lib/mongo/session.rb', line 36 def client @client end |
#cluster_time ⇒ Object (readonly)
The cluster time for this session.
41 42 43 |
# File 'lib/mongo/session.rb', line 41 def cluster_time @cluster_time end |
#operation_time ⇒ Object (readonly)
The latest seen operation time for this session.
46 47 48 |
# File 'lib/mongo/session.rb', line 46 def operation_time @operation_time end |
#options ⇒ Object (readonly)
Get the options for this session.
31 32 33 |
# File 'lib/mongo/session.rb', line 31 def @options end |
#txn_options ⇒ Object (readonly)
The options for the transaction currently being executed on the session.
51 52 53 |
# File 'lib/mongo/session.rb', line 51 def @txn_options end |
Instance Method Details
#abort_transaction ⇒ Object
Abort the currently active transaction without making any changes to the database.
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 |
# File 'lib/mongo/session.rb', line 623 def abort_transaction check_if_ended! check_if_no_transaction! if within_states?(TRANSACTION_COMMITTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_after_msg( :commitTransaction, :abortTransaction)) end if within_states?(TRANSACTION_ABORTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_twice_msg(:abortTransaction)) end begin unless starting_transaction? write_with_retry(self, [:write_concern], true) do |server, txn_num| Operation::Command.new( selector: { abortTransaction: 1 }, db_name: 'admin', session: self, txn_num: txn_num ).execute(server) end end @state = TRANSACTION_ABORTED_STATE rescue Mongo::Error::InvalidTransactionOperation raise rescue Mongo::Error @state = TRANSACTION_ABORTED_STATE end end |
#add_autocommit!(command) ⇒ Hash, BSON::Document
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Add the autocommit field to a command document if applicable.
195 196 197 198 199 |
# File 'lib/mongo/session.rb', line 195 def add_autocommit!(command) command.tap do |c| c[:autocommit] = false if in_transaction? end end |
#add_id!(command) ⇒ Hash, BSON::Document
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Add this session’s id to a command document.
210 211 212 |
# File 'lib/mongo/session.rb', line 210 def add_id!(command) command.merge!(lsid: session_id) end |
#add_start_transaction!(command) ⇒ Hash, BSON::Document
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Add the startTransaction field to a command document if applicable.
223 224 225 226 227 228 229 |
# File 'lib/mongo/session.rb', line 223 def add_start_transaction!(command) command.tap do |c| if starting_transaction? c[:startTransaction] = true end end end |
#add_txn_num!(command) ⇒ Hash, BSON::Document
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Add the transaction number to a command document if applicable.
240 241 242 243 244 |
# File 'lib/mongo/session.rb', line 240 def add_txn_num!(command) command.tap do |c| c[:txnNumber] = BSON::Int64.new(@server_session.txn_num) if in_transaction? end end |
#add_txn_opts!(command, read) ⇒ Hash, BSON::Document
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Add the transactions options if applicable.
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/mongo/session.rb', line 255 def add_txn_opts!(command, read) command.tap do |c| # The read preference should be added for all read operations. if read && txn_read_pref = txn_read_preference Mongo::Lint.validate_underscore_read_preference(txn_read_pref) txn_read_pref = txn_read_pref.dup txn_read_pref[:mode] = txn_read_pref[:mode].to_s.gsub(/(_\w)/) { |match| match[1].upcase } Mongo::Lint.validate_camel_case_read_preference(txn_read_pref) c['$readPreference'] = txn_read_pref end # The read concern should be added to any command that starts a transaction. if starting_transaction? && txn_read_concern c[:readConcern] ||= {} c[:readConcern].merge!(txn_read_concern) end # We need to send the read concern level as a string rather than a symbol. if c[:readConcern] && c[:readConcern][:level] c[:readConcern][:level] = c[:readConcern][:level].to_s end # The write concern should be added to any abortTransaction or commitTransaction command. if (c[:abortTransaction] || c[:commitTransaction]) && txn_write_concern c[:writeConcern] = txn_write_concern end # A non-numeric write concern w value needs to be sent as a string rather than a symbol. if c[:writeConcern] && c[:writeConcern][:w] && c[:writeConcern][:w].is_a?(Symbol) c[:writeConcern][:w] = c[:writeConcern][:w].to_s end end end |
#advance_cluster_time(new_cluster_time) ⇒ BSON::Document, Hash
Advance the cached cluster time document for this session.
386 387 388 389 390 391 392 |
# File 'lib/mongo/session.rb', line 386 def advance_cluster_time(new_cluster_time) if @cluster_time @cluster_time = [ @cluster_time, new_cluster_time ].max_by { |doc| doc[Cluster::CLUSTER_TIME] } else @cluster_time = new_cluster_time end end |
#advance_operation_time(new_operation_time) ⇒ BSON::Timestamp
Advance the cached operation time for this session.
404 405 406 407 408 409 410 |
# File 'lib/mongo/session.rb', line 404 def advance_operation_time(new_operation_time) if @operation_time @operation_time = [ @operation_time, new_operation_time ].max else @operation_time = new_operation_time end end |
#cluster ⇒ Object
790 791 792 |
# File 'lib/mongo/session.rb', line 790 def cluster @client.cluster end |
#commit_transaction(options = nil) ⇒ Object
Commit the currently active transaction on the session.
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 |
# File 'lib/mongo/session.rb', line 560 def commit_transaction(=nil) check_if_ended! check_if_no_transaction! if within_states?(TRANSACTION_ABORTED_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation.cannot_call_after_msg( :abortTransaction, :commitTransaction)) end ||= {} begin # If commitTransaction is called twice, we need to run the same commit # operation again, so we revert the session to the previous state. if within_states?(TRANSACTION_COMMITTED_STATE) @state = @last_commit_skipped ? STARTING_TRANSACTION_STATE : TRANSACTION_IN_PROGRESS_STATE end if starting_transaction? @last_commit_skipped = true else @last_commit_skipped = false write_concern = [:write_concern] || [:write_concern] if write_concern && !write_concern.is_a?(WriteConcern::Base) write_concern = WriteConcern.get(write_concern) end write_with_retry(self, write_concern, true) do |server, txn_num| Operation::Command.new( selector: { commitTransaction: 1 }, db_name: 'admin', session: self, txn_num: txn_num, write_concern: write_concern, ).execute(server) end end rescue Mongo::Error::NoServerAvailable, Mongo::Error::SocketError => e e.send(:add_label, Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) raise e rescue Mongo::Error::OperationFailure => e err_doc = e.instance_variable_get(:@result).send(:first_document) if e.write_retryable? || (err_doc['writeConcernError'] && !UNLABELED_WRITE_CONCERN_CODES.include?(err_doc['writeConcernError']['code'])) e.send(:add_label, Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) end raise e ensure @state = TRANSACTION_COMMITTED_STATE end end |
#end_session ⇒ nil
End this session.
160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/mongo/session.rb', line 160 def end_session if !ended? && @client if within_states?(TRANSACTION_IN_PROGRESS_STATE) begin abort_transaction rescue Mongo::Error end end @client.cluster.session_pool.checkin(@server_session) end ensure @server_session = nil end |
#ended? ⇒ true, false
Whether this session has ended.
182 183 184 |
# File 'lib/mongo/session.rb', line 182 def ended? @server_session.nil? end |
#explicit? ⇒ true, false
Is this session an explicit one (i.e. user-created).
497 498 499 |
# File 'lib/mongo/session.rb', line 497 def explicit? @explicit ||= !implicit? end |
#implicit? ⇒ true, false
Is this session an implicit one (not user-created).
485 486 487 |
# File 'lib/mongo/session.rb', line 485 def implicit? @implicit ||= !!(@options.key?(:implicit) && @options[:implicit] == true) end |
#in_transaction? ⇒ true | false
Whether or not the session is currently in a transaction.
666 667 668 |
# File 'lib/mongo/session.rb', line 666 def in_transaction? within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) end |
#inspect ⇒ String
Get a formatted string for use in inspection.
148 149 150 |
# File 'lib/mongo/session.rb', line 148 def inspect "#<Mongo::Session:0x#{object_id} session_id=#{session_id} options=#{@options}>" end |
#next_txn_num ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Increment and return the next transaction number.
453 454 455 456 457 458 459 |
# File 'lib/mongo/session.rb', line 453 def next_txn_num if ended? raise Error::SessionEnded end @server_session.next_txn_num end |
#process(result) ⇒ Operation::Result
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Process a response from the server that used this session.
367 368 369 370 371 372 373 374 |
# File 'lib/mongo/session.rb', line 367 def process(result) unless implicit? set_operation_time(result) set_cluster_time(result) end @server_session.set_last_use! result end |
#retry_writes? ⇒ true, false
Retryable writes are only available on server versions at least 3.6 and with sharded clusters or replica sets.
Will writes executed with this session be retried.
423 424 425 |
# File 'lib/mongo/session.rb', line 423 def retry_writes? !!client.[:retry_writes] && (cluster.replica_set? || cluster.sharded?) end |
#session_id ⇒ BSON::Document
Get the server session id of this session, if the session was not ended. If the session was ended, returns nil.
436 437 438 439 440 441 442 |
# File 'lib/mongo/session.rb', line 436 def session_id if ended? raise Error::SessionEnded end @server_session.session_id end |
#start_transaction(options = nil) ⇒ Object
Places subsequent operations in this session into a new transaction.
Note that the transaction will not be started on the server until an operation is performed after start_transaction is called.
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 |
# File 'lib/mongo/session.rb', line 526 def start_transaction( = nil) if Lint.validate_read_concern_option([:read_concern]) end check_if_ended! if within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::TRANSACTION_ALREADY_IN_PROGRESS) end next_txn_num @txn_options = || @options[:default_transaction_options] || {} if txn_write_concern && WriteConcern.send(:unacknowledged?, txn_write_concern) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::UNACKNOWLEDGED_WRITE_CONCERN) end @state = STARTING_TRANSACTION_STATE end |
#suppress_read_write_concern!(command) ⇒ Hash, BSON::Document
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Remove the read concern and/or write concern from the command if not applicable.
298 299 300 301 302 303 304 305 |
# File 'lib/mongo/session.rb', line 298 def suppress_read_write_concern!(command) command.tap do |c| next unless in_transaction? c.delete(:readConcern) unless starting_transaction? c.delete(:writeConcern) unless c[:commitTransaction] || c[:abortTransaction] end end |
#txn_num ⇒ Integer
Get the current transaction number.
469 470 471 472 473 474 475 |
# File 'lib/mongo/session.rb', line 469 def txn_num if ended? raise Error::SessionEnded end @server_session.txn_num end |
#txn_read_preference ⇒ Hash
Get the read preference the session will use in the currently active transaction.
This is a driver style hash with underscore keys.
783 784 785 786 787 788 |
# File 'lib/mongo/session.rb', line 783 def txn_read_preference rp = && [:read_preference] || @client.read_preference Mongo::Lint.validate_underscore_read_preference(rp) rp end |
#update_state! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Update the state of the session due to a (non-commit and non-abort) operation being run.
328 329 330 331 332 333 334 335 |
# File 'lib/mongo/session.rb', line 328 def update_state! case @state when STARTING_TRANSACTION_STATE @state = TRANSACTION_IN_PROGRESS_STATE when TRANSACTION_COMMITTED_STATE, TRANSACTION_ABORTED_STATE @state = NO_TRANSACTION_STATE end end |
#validate!(cluster) ⇒ nil
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Validate the session.
350 351 352 353 354 |
# File 'lib/mongo/session.rb', line 350 def validate!(cluster) check_matching_cluster!(cluster) check_if_ended! self end |
#validate_read_preference!(command) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Ensure that the read preference of a command primary.
317 318 319 320 321 322 |
# File 'lib/mongo/session.rb', line 317 def validate_read_preference!(command) return unless in_transaction? && non_primary_read_preference_mode?(command) raise Mongo::Error::InvalidTransactionOperation.new( Mongo::Error::InvalidTransactionOperation::INVALID_READ_PREFERENCE) end |
#with_transaction(options = nil) ⇒ Object
Executes the provided block in a transaction, retrying as necessary.
Returns the return value of the block.
Exact number of retries and when they are performed are implementation details of the driver; the provided block should be idempotent, and should be prepared to be called more than once. The driver may retry the commit command within an active transaction or it may repeat the transaction and invoke the block again, depending on the error encountered if any. Note also that the retries may be executed against different servers.
Transactions cannot be nested - InvalidTransactionOperation will be raised if this method is called when the session already has an active transaction.
Exceptions raised by the block which are not derived from Mongo::Error stop processing, abort the transaction and are propagated out of with_transaction. Exceptions derived from Mongo::Error may be handled by with_transaction, resulting in retries of the process.
The number of retries and the total time taken by with_transaction is not specified by the driver. The driver only guarantees that the number of retries attempted will be finite, i.e., that it will stop retrying at some point. The number of retries and the time allowed for the retries is subject to change in future versions of the driver. Applications that require known performance characteristics (for example, when servicing web requests in an application with a fixed number of servers/workers/threads) are encouraged to explicitly limit the time they allow for with_transaction calls.
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 |
# File 'lib/mongo/session.rb', line 723 def with_transaction(=nil) loop do = {} if [:write_concern] = [:write_concern] end start_transaction() begin rv = yield self rescue Exception => e if within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) abort_transaction end if e.is_a?(Mongo::Error) && e.label?(Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL) next end raise else if within_states?(TRANSACTION_ABORTED_STATE, NO_TRANSACTION_STATE, TRANSACTION_COMMITTED_STATE) return rv end begin commit_transaction() return rv rescue Mongo::Error => e if e.label?(Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) = case v = [:write_concern] when WriteConcern::Base v. when nil {} else v end [:write_concern] = .merge(w: :majority) retry elsif e.label?(Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL) next else raise end end end end end |