Class: Petra::Components::Transaction
- Inherits:
-
Object
- Object
- Petra::Components::Transaction
- Includes:
- ActiveSupport::Callbacks
- Defined in:
- lib/petra/components/transaction.rb
Instance Attribute Summary collapse
-
#committed ⇒ Object
(also: #committed?)
readonly
Returns the value of attribute committed.
-
#identifier ⇒ Object
readonly
Returns the value of attribute identifier.
-
#persisted ⇒ Object
(also: #persisted?)
readonly
Returns the value of attribute persisted.
-
#reset ⇒ Object
(also: #reset?)
readonly
Returns the value of attribute reset.
-
#retry_in_progress ⇒ Object
(also: #retry_in_progress?)
readonly
Returns the value of attribute retry_in_progress.
Instance Method Summary collapse
- #after_initialize ⇒ Object
-
#attribute_change_veto?(proxy, attribute:) ⇒ Boolean
TODO: Combine with #read_integrity_override, because DRY.
-
#attribute_value(proxy, attribute:) ⇒ Object
Returns the latest value which was set for a certain object attribute.
-
#attribute_value?(proxy, attribute:) ⇒ Boolean
(also: #attribute_changed?)
Checks whether the given attribute has been changed during the transaction.
-
#commit! ⇒ Object
Tries to commit the current transaction.
-
#current_section ⇒ Object
—————————————————————- Sections —————————————————————-.
-
#initialize(identifier:) ⇒ Transaction
constructor
A new instance of Transaction.
-
#log_entries ⇒ Petra::Components::EntrySet
TODO: Cache entries from already persisted sections.
-
#objects ⇒ Object
—————————————————————- Object Helpers —————————————————————-.
-
#persist! ⇒ Object
Persists the current transaction section using the configured persistence adapter.
-
#prepare_for_retry! ⇒ Object
Make sure that overrides (ReadIntegrityOverride / AttributeChangeVeto) are persisted before retrying a section.
-
#read_attribute_value(proxy, attribute:) ⇒ Object
The last read value for the given attribute.
-
#read_attribute_value?(proxy, attribute:) ⇒ Boolean
(also: #attribute_read?)
trueif the given attribute was read in one of the previous (or the current) sections. -
#read_integrity_override?(proxy, attribute:, external_value:) ⇒ Boolean
trueif ReadIntegrityErrors should still be suppressed for the given attribute. -
#reset! ⇒ Object
Completely dismisses the current transaction and removes it from the persistence storage.
-
#rollback! ⇒ Object
Performs a rollback on this transaction, meaning that it will be set to the state of the latest savepoint.
- #sections ⇒ Object
-
#verify_attribute_integrity!(proxy, attribute:, force: false) ⇒ Object
Checks whether the given attribute has been changed since we last read it.
Constructor Details
#initialize(identifier:) ⇒ Transaction
Returns a new instance of Transaction.
30 31 32 33 34 35 36 |
# File 'lib/petra/components/transaction.rb', line 30 def initialize(identifier:) @identifier = identifier @persisted = false @committed = false @reset = false @retry_in_progress = false end |
Instance Attribute Details
#committed ⇒ Object (readonly) Also known as: committed?
Returns the value of attribute committed.
15 16 17 |
# File 'lib/petra/components/transaction.rb', line 15 def committed @committed end |
#identifier ⇒ Object (readonly)
Returns the value of attribute identifier.
13 14 15 |
# File 'lib/petra/components/transaction.rb', line 13 def identifier @identifier end |
#persisted ⇒ Object (readonly) Also known as: persisted?
Returns the value of attribute persisted.
14 15 16 |
# File 'lib/petra/components/transaction.rb', line 14 def persisted @persisted end |
#reset ⇒ Object (readonly) Also known as: reset?
Returns the value of attribute reset.
16 17 18 |
# File 'lib/petra/components/transaction.rb', line 16 def reset @reset end |
#retry_in_progress ⇒ Object (readonly) Also known as: retry_in_progress?
Returns the value of attribute retry_in_progress.
17 18 19 |
# File 'lib/petra/components/transaction.rb', line 17 def retry_in_progress @retry_in_progress end |
Instance Method Details
#after_initialize ⇒ Object
38 39 40 41 |
# File 'lib/petra/components/transaction.rb', line 38 def after_initialize # Initialize the current section current_section end |
#attribute_change_veto?(proxy, attribute:) ⇒ Boolean
TODO: Combine with #read_integrity_override, because DRY
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/petra/components/transaction.rb', line 156 def attribute_change_veto?(proxy, attribute:) # Step 1: Search for the latest attribute change veto entry we have for the given attribute attribute_entries = log_entries.for_attribute_key(proxy.__attribute_key(attribute)) acv_entry = attribute_entries.of_kind(:attribute_change_veto).latest # If there hasn't been an attribute change veto in the past, there can't be an active one return false unless acv_entry # Step 2: Find the latest attribute change entry we have for the given attribute change_entry = attribute_entries.of_kind(:attribute_change).latest # Step 3: Check if the change entry is newer than the ACV entry # If so, the ACV entry is no longer valid change_entry < acv_entry end |
#attribute_value(proxy, attribute:) ⇒ Object
Returns the latest value which was set for a certain object attribute. This means that all previous sections’ write sets are inspected from new to old.
60 61 62 |
# File 'lib/petra/components/transaction.rb', line 60 def attribute_value(proxy, attribute:) sections.reverse.find { |s| s.value_for?(proxy, attribute: attribute) }.value_for(proxy, attribute: attribute) end |
#attribute_value?(proxy, attribute:) ⇒ Boolean Also known as: attribute_changed?
Checks whether the given attribute has been changed during the transaction. It basically searches for a matching write set entry in all previous (and current) sections. If such an entry exists AND there hasn’t been an attribute change veto which is newer than it, the attribute counts as “changed within the transaction”.
72 73 74 75 |
# File 'lib/petra/components/transaction.rb', line 72 def attribute_value?(proxy, attribute:) sections.reverse.any? { |s| s.value_for?(proxy, attribute: attribute) } && !attribute_change_veto?(proxy, attribute: attribute) end |
#commit! ⇒ Object
Tries to commit the current transaction
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/petra/components/transaction.rb', line 283 def commit! run_callbacks :commit do # Step 1: Lock this transaction so no other thread may alter it any more persistence_adapter.with_transaction_lock(identifier) do # Step 2: Try to get the locks for all objects which took part in this transaction # Acquire the locks on a sorted collection to avoid Deadlocks with other transactions # We do not have to lock objects which were created within the transaction # as the cannot be altered outside of it and the transaction itself is locked. with_locked_objects(objects.fateful.sort.reject(&:__new?), suspend: false) do # Step 3: Now that we got locks on all objects used during this transaction, # we can check whether all read attributes still have the same value. # If that's not the case, we may not proceed. objects.verify_read_attributes!(force: true) # Step 4: Now that we know that all read values are still valid, # we may actually apply all the changes we previously logged. sections.each(&:apply_log_entries!) @committed = true Petra.logger.info "Committed transaction #{@identifier}", :blue, :underline # Step 5: Wow, me made it this far! # Now it's time to clean up and remove the data we previously persisted for this # transaction before releasing the lock on all of the objects and the transaction itself. # TODO: See if this causes problems with other threads working on this transactions. Probably keep # the entries around and just mark the transaction as committed? # Idea: keep it and add a last log entry like `transaction_commit` and persist it. persistence_adapter.reset_transaction(self) end end rescue Petra::ReadIntegrityError # One (or more) of the attributes from our read set changed externally raise rescue Petra::LockError # One (or more) of the objects could not be locked. # The object locks are freed by itself, but we have to notify # the outer application about this commit error raise end end |
#current_section ⇒ Object
Sections
259 260 261 262 263 |
# File 'lib/petra/components/transaction.rb', line 259 def current_section @current_section ||= Petra::Components::Section.new(self).tap do |s| sections << s end end |
#log_entries ⇒ Petra::Components::EntrySet
TODO: Cache entries from already persisted sections.
101 102 103 |
# File 'lib/petra/components/transaction.rb', line 101 def log_entries sections.each_with_object(EntrySet.new) { |s, es| es.concat(s.log_entries) } end |
#objects ⇒ Object
Object Helpers
251 252 253 |
# File 'lib/petra/components/transaction.rb', line 251 def objects @objects ||= ProxyCache.new(self) end |
#persist! ⇒ Object
Persists the current transaction section using the configured persistence adapter
349 350 351 352 353 354 355 356 |
# File 'lib/petra/components/transaction.rb', line 349 def persist! run_callbacks :persist do current_section.enqueue_for_persisting! persistence_adapter.persist! Petra.logger.debug "Persisted transaction #{@identifier}", :green @persisted = true end end |
#prepare_for_retry! ⇒ Object
Make sure that overrides (ReadIntegrityOverride / AttributeChangeVeto) are persisted before retrying a section. If we don’t persist those, the same error will simply happen again in the next iteration.
327 328 329 330 331 332 |
# File 'lib/petra/components/transaction.rb', line 327 def prepare_for_retry! @retry_in_progress = true current_section.prepare_for_retry! persistence_adapter.persist! @retry_in_progress = false end |
#read_attribute_value(proxy, attribute:) ⇒ Object
Returns the last read value for the given attribute.
87 88 89 90 91 |
# File 'lib/petra/components/transaction.rb', line 87 def read_attribute_value(proxy, attribute:) sections .reverse.find { |s| s.read_value_for?(proxy, attribute: attribute) } .read_value_for(proxy, attribute: attribute) end |
#read_attribute_value?(proxy, attribute:) ⇒ Boolean Also known as: attribute_read?
Returns true if the given attribute was read in one of the previous (or the current) sections.
80 81 82 |
# File 'lib/petra/components/transaction.rb', line 80 def read_attribute_value?(proxy, attribute:) sections.reverse.any? { |s| s.read_value_for?(proxy, attribute: attribute) } end |
#read_integrity_override?(proxy, attribute:, external_value:) ⇒ Boolean
Returns true if ReadIntegrityErrors should still be suppressed for the given attribute. This is the case if a ReadIntegrityOverride log entry is still active.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/petra/components/transaction.rb', line 118 def read_integrity_override?(proxy, attribute:, external_value:) # Step 1: Search for the latest read integrity override entry we have for the given attribute attribute_entries = log_entries.for_attribute_key(proxy.__attribute_key(attribute)) rio_entry = attribute_entries.of_kind(:read_integrity_override).latest # If there was no override in the past sections, there can't be an active one return false unless rio_entry # Step 2: Find the read log entry we previously created for this attribute. # There has to be one as otherwise no read integrity error could have happened. read_entry = attribute_entries.of_kind(:attribute_read).latest # Step 3: Test if the read entry is newer than the RIO entry. # If that's the case, the user most likely decided that the new external # value should be displayed inside the transaction. # As we could have only landed here if the external value changed again, # we probably have to re-raise an exception about that. return false if read_entry > rio_entry # Step 4: We found ourselves a RIO entry that has not yet been invalidated # by another attribute read, good. # Now we have to check whether the current external value is still # the same as at the time we generated the RIO entry. # If that's the case, we still have an active read integrity override. rio_entry.external_value == external_value end |
#reset! ⇒ Object
Completely dismisses the current transaction and removes it from the persistence storage
361 362 363 364 365 366 367 |
# File 'lib/petra/components/transaction.rb', line 361 def reset! run_callbacks :reset do persistence_adapter.reset_transaction(self) @sections = [] Petra.logger.warn "Reset transaction #{@identifier}", :red end end |
#rollback! ⇒ Object
Performs a rollback on this transaction, meaning that it will be set to the state of the latest savepoint. The current section will be reset, but keep the same savepoint name.
339 340 341 342 343 344 |
# File 'lib/petra/components/transaction.rb', line 339 def rollback! run_callbacks :rollback do current_section.reset! unless current_section.persisted? Petra.logger.debug "Rolled back section #{current_section.savepoint}", :yellow end end |
#sections ⇒ Object
265 266 267 268 269 270 271 272 273 274 |
# File 'lib/petra/components/transaction.rb', line 265 def sections # TODO: Acquire the transaction lock once here, otherwise, every section will do it. @sections ||= begin persistence_adapter.with_transaction_lock(self) do persistence_adapter.savepoints(self).map do |savepoint| Petra::Components::Section.new(self, savepoint: savepoint) end.sort_by(&:savepoint_version) end end end |
#verify_attribute_integrity!(proxy, attribute:, force: false) ⇒ Object
Checks whether the given attribute has been changed since we last read it. Raises an exception if the attribute was changed externally
We cannot check here whether the attribute had several different values before going back to the original one, so we only compare the current and the last read value.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/petra/components/transaction.rb', line 193 def verify_attribute_integrity!(proxy, attribute:, force: false) # If we didn't read the attribute before, we can't search for changes return unless attribute_read?(proxy, attribute: attribute) # Don't perform the check if the force flag is not set and # petra is configured to not fail on read integrity errors at all. return if !force && !Petra.configuration.instant_read_integrity_fail # New objects won't be changed externally... return if proxy.__new? external_value = proxy.unproxied.send(attribute) last_read_value = read_attribute_value(proxy, attribute: attribute) # If nothing changed, we're done return if external_value == last_read_value # The user has previously chosen to ignore the external changes to this attribute (using ignore!). # Therefore, we do not have to raise another exception # OR # We only read this attribute before. # If the user (/developer) previously placed a read integrity override # for the current external value, we don't have to re-raise an exception about the change return if read_integrity_override?(proxy, attribute: attribute, external_value: external_value) if attribute_changed?(proxy, attribute: attribute) # We read AND changed this attribute before # If there is already an active attribute change veto (meaning that we didn't change # the attribute again after the last one), we don't have to raise another exception about it. # TODO: This should have already been filtered out by #attribute_changed? # return if attribute_change_veto?(proxy, attribute: attribute) callcc do |continuation| exception = Petra::WriteClashError.new(attribute: attribute, object: proxy, our_value: attribute_value(proxy, attribute: attribute), external_value: external_value, continuation: continuation) fail exception, "The attribute `#{attribute}` has been changed externally and in the transaction." end else callcc do |continuation| exception = Petra::ReadIntegrityError.new(attribute: attribute, object: proxy, last_read_value: last_read_value, external_value: external_value, continuation: continuation) fail exception, "The attribute `#{attribute}` has been changed externally." end end end |