Class: Petra::Components::Transaction

Inherits:
Object
  • Object
show all
Includes:
ActiveSupport::Callbacks
Defined in:
lib/petra/components/transaction.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(identifier:) ⇒ Transaction

Returns a new instance of Transaction.



30
31
32
33
34
35
36
# File 'lib/petra/components/transaction.rb', line 30

def initialize(identifier:)
  @identifier        = identifier
  @persisted         = false
  @committed         = false
  @reset             = false
  @retry_in_progress = false
end

Instance Attribute Details

#committedObject (readonly) Also known as: committed?

Returns the value of attribute committed.



15
16
17
# File 'lib/petra/components/transaction.rb', line 15

def committed
  @committed
end

#identifierObject (readonly)

Returns the value of attribute identifier.



13
14
15
# File 'lib/petra/components/transaction.rb', line 13

def identifier
  @identifier
end

#persistedObject (readonly) Also known as: persisted?

Returns the value of attribute persisted.



14
15
16
# File 'lib/petra/components/transaction.rb', line 14

def persisted
  @persisted
end

#resetObject (readonly) Also known as: reset?

Returns the value of attribute reset.



16
17
18
# File 'lib/petra/components/transaction.rb', line 16

def reset
  @reset
end

#retry_in_progressObject (readonly) Also known as: retry_in_progress?

Returns the value of attribute retry_in_progress.



17
18
19
# File 'lib/petra/components/transaction.rb', line 17

def retry_in_progress
  @retry_in_progress
end

Instance Method Details

#after_initializeObject



38
39
40
41
# File 'lib/petra/components/transaction.rb', line 38

def after_initialize
  # Initialize the current section
  current_section
end

#attribute_change_veto?(proxy, attribute:) ⇒ Boolean

TODO: Combine with #read_integrity_override, because DRY

Parameters:

Returns:

  • (Boolean)

    true if there is an active AttributeChangeVeto for the given attribute, meaning that all attribute changes should be discarded.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/petra/components/transaction.rb', line 156

def attribute_change_veto?(proxy, attribute:)
  # Step 1: Search for the latest attribute change veto entry we have for the given attribute
  attribute_entries = log_entries.for_attribute_key(proxy.__attribute_key(attribute))
  acv_entry         = attribute_entries.of_kind(:attribute_change_veto).latest

  # If there hasn't been an attribute change veto in the past, there can't be an active one
  return false unless acv_entry

  # Step 2: Find the latest attribute change entry we have for the given attribute
  change_entry = attribute_entries.of_kind(:attribute_change).latest

  # Step 3: Check if the change entry is newer than the ACV entry
  #   If so, the ACV entry is no longer valid
  change_entry < acv_entry
end

#attribute_value(proxy, attribute:) ⇒ Object

Returns the latest value which was set for a certain object attribute. This means that all previous sections’ write sets are inspected from new to old.



60
61
62
# File 'lib/petra/components/transaction.rb', line 60

def attribute_value(proxy, attribute:)
  sections.reverse.find { |s| s.value_for?(proxy, attribute: attribute) }.value_for(proxy, attribute: attribute)
end

#attribute_value?(proxy, attribute:) ⇒ Boolean Also known as: attribute_changed?

Checks whether the given attribute has been changed during the transaction. It basically searches for a matching write set entry in all previous (and current) sections. If such an entry exists AND there hasn’t been an attribute change veto which is newer than it, the attribute counts as “changed within the transaction”.

Returns:

  • (Boolean)

    true if there there was a valid attribute change



72
73
74
75
# File 'lib/petra/components/transaction.rb', line 72

def attribute_value?(proxy, attribute:)
  sections.reverse.any? { |s| s.value_for?(proxy, attribute: attribute) } &&
    !attribute_change_veto?(proxy, attribute: attribute)
end

#commit!Object

Tries to commit the current transaction



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/petra/components/transaction.rb', line 283

def commit!
  run_callbacks :commit do
    # Step 1: Lock this transaction so no other thread may alter it any more
    persistence_adapter.with_transaction_lock(identifier) do
      # Step 2: Try to get the locks for all objects which took part in this transaction
      #   Acquire the locks on a sorted collection to avoid Deadlocks with other transactions
      # We do not have to lock objects which were created within the transaction
      #   as the cannot be altered outside of it and the transaction itself is locked.
      with_locked_objects(objects.fateful.sort.reject(&:__new?), suspend: false) do
        # Step 3: Now that we got locks on all objects used during this transaction,
        #   we can check whether all read attributes still have the same value.
        #   If that's not the case, we may not proceed.
        objects.verify_read_attributes!(force: true)

        # Step 4: Now that we know that all read values are still valid,
        #   we may actually apply all the changes we previously logged.
        sections.each(&:apply_log_entries!)

        @committed = true
        Petra.logger.info "Committed transaction #{@identifier}", :blue, :underline

        # Step 5: Wow, me made it this far!
        #   Now it's time to clean up and remove the data we previously persisted for this
        #   transaction before releasing the lock on all of the objects and the transaction itself.
        # TODO: See if this causes problems with other threads working on this transactions. Probably keep
        #   the entries around and just mark the transaction as committed?
        #   Idea: keep it and add a last log entry like `transaction_commit` and persist it.
        persistence_adapter.reset_transaction(self)
      end
    end
  rescue Petra::ReadIntegrityError # One (or more) of the attributes from our read set changed externally
    raise
  rescue Petra::LockError # One (or more) of the objects could not be locked.
    #   The object locks are freed by itself, but we have to notify
    #   the outer application about this commit error
    raise
  end
end

#current_sectionObject


Sections



259
260
261
262
263
# File 'lib/petra/components/transaction.rb', line 259

def current_section
  @current_section ||= Petra::Components::Section.new(self).tap do |s|
    sections << s
  end
end

#log_entriesPetra::Components::EntrySet

TODO: Cache entries from already persisted sections.

Returns:



101
102
103
# File 'lib/petra/components/transaction.rb', line 101

def log_entries
  sections.each_with_object(EntrySet.new) { |s, es| es.concat(s.log_entries) }
end

#objectsObject


Object Helpers



251
252
253
# File 'lib/petra/components/transaction.rb', line 251

def objects
  @objects ||= ProxyCache.new(self)
end

#persist!Object

Persists the current transaction section using the configured persistence adapter



349
350
351
352
353
354
355
356
# File 'lib/petra/components/transaction.rb', line 349

def persist!
  run_callbacks :persist do
    current_section.enqueue_for_persisting!
    persistence_adapter.persist!
    Petra.logger.debug "Persisted transaction #{@identifier}", :green
    @persisted = true
  end
end

#prepare_for_retry!Object

Make sure that overrides (ReadIntegrityOverride / AttributeChangeVeto) are persisted before retrying a section. If we don’t persist those, the same error will simply happen again in the next iteration.



327
328
329
330
331
332
# File 'lib/petra/components/transaction.rb', line 327

def prepare_for_retry!
  @retry_in_progress = true
  current_section.prepare_for_retry!
  persistence_adapter.persist!
  @retry_in_progress = false
end

#read_attribute_value(proxy, attribute:) ⇒ Object

Returns the last read value for the given attribute.

Returns:

  • (Object)

    the last read value for the given attribute



87
88
89
90
91
# File 'lib/petra/components/transaction.rb', line 87

def read_attribute_value(proxy, attribute:)
  sections
    .reverse.find { |s| s.read_value_for?(proxy, attribute: attribute) }
    .read_value_for(proxy, attribute: attribute)
end

#read_attribute_value?(proxy, attribute:) ⇒ Boolean Also known as: attribute_read?

Returns true if the given attribute was read in one of the previous (or the current) sections.

Returns:

  • (Boolean)

    true if the given attribute was read in one of the previous (or the current) sections



80
81
82
# File 'lib/petra/components/transaction.rb', line 80

def read_attribute_value?(proxy, attribute:)
  sections.reverse.any? { |s| s.read_value_for?(proxy, attribute: attribute) }
end

#read_integrity_override?(proxy, attribute:, external_value:) ⇒ Boolean

Returns true if ReadIntegrityErrors should still be suppressed for the given attribute. This is the case if a ReadIntegrityOverride log entry is still active.

Parameters:

  • proxy (Petra::Proxies::ObjectProxy)
  • attribute (String, Symbol)
  • external_value (Object)

    The current external value. It is needed as read integrity overrides only stay active as long as the external value stays the same.

Returns:

  • (Boolean)

    true if ReadIntegrityErrors should still be suppressed for the given attribute. This is the case if a ReadIntegrityOverride log entry is still active



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/petra/components/transaction.rb', line 118

def read_integrity_override?(proxy, attribute:, external_value:)
  # Step 1: Search for the latest read integrity override entry we have for the given attribute
  attribute_entries = log_entries.for_attribute_key(proxy.__attribute_key(attribute))
  rio_entry         = attribute_entries.of_kind(:read_integrity_override).latest

  # If there was no override in the past sections, there can't be an active one
  return false unless rio_entry

  # Step 2: Find the read log entry we previously created for this attribute.
  #   There has to be one as otherwise no read integrity error could have happened.
  read_entry = attribute_entries.of_kind(:attribute_read).latest

  # Step 3: Test if the read entry is newer than the RIO entry.
  #   If that's the case, the user most likely decided that the new external
  #   value should be displayed inside the transaction.
  #   As we could have only landed here if the external value changed again,
  #   we probably have to re-raise an exception about that.
  return false if read_entry > rio_entry

  # Step 4: We found ourselves a RIO entry that has not yet been invalidated
  #   by another attribute read, good.
  #   Now we have to check whether the current external value is still
  #   the same as at the time we generated the RIO entry.
  #   If that's the case, we still have an active read integrity override.
  rio_entry.external_value == external_value
end

#reset!Object

Completely dismisses the current transaction and removes it from the persistence storage



361
362
363
364
365
366
367
# File 'lib/petra/components/transaction.rb', line 361

def reset!
  run_callbacks :reset do
    persistence_adapter.reset_transaction(self)
    @sections = []
    Petra.logger.warn "Reset transaction #{@identifier}", :red
  end
end

#rollback!Object

Performs a rollback on this transaction, meaning that it will be set to the state of the latest savepoint. The current section will be reset, but keep the same savepoint name.



339
340
341
342
343
344
# File 'lib/petra/components/transaction.rb', line 339

def rollback!
  run_callbacks :rollback do
    current_section.reset! unless current_section.persisted?
    Petra.logger.debug "Rolled back section #{current_section.savepoint}", :yellow
  end
end

#sectionsObject



265
266
267
268
269
270
271
272
273
274
# File 'lib/petra/components/transaction.rb', line 265

def sections
  # TODO: Acquire the transaction lock once here, otherwise, every section will do it.
  @sections ||= begin
    persistence_adapter.with_transaction_lock(self) do
      persistence_adapter.savepoints(self).map do |savepoint|
        Petra::Components::Section.new(self, savepoint: savepoint)
      end.sort_by(&:savepoint_version)
    end
  end
end

#verify_attribute_integrity!(proxy, attribute:, force: false) ⇒ Object

Checks whether the given attribute has been changed since we last read it. Raises an exception if the attribute was changed externally

We cannot check here whether the attribute had several different values before going back to the original one, so we only compare the current and the last read value.

Parameters:

  • force (Boolean) (defaults to: false)

    If set to true, the check is performed even if it was disabled in the base configuration.

Raises:

  • (Petra::ReadIntegrityError)

    Raised if an attribute that we previously read, but NOT changed was changed externally

  • (Petra::ReadWriteIntegrityError)

    Raised if an attribute that we previously read AND changed was changed externally



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/petra/components/transaction.rb', line 193

def verify_attribute_integrity!(proxy, attribute:, force: false)
  # If we didn't read the attribute before, we can't search for changes
  return unless attribute_read?(proxy, attribute: attribute)

  # Don't perform the check if the force flag is not set and
  # petra is configured to not fail on read integrity errors at all.
  return if !force && !Petra.configuration.instant_read_integrity_fail

  # New objects won't be changed externally...
  return if proxy.__new?

  external_value  = proxy.unproxied.send(attribute)
  last_read_value = read_attribute_value(proxy, attribute: attribute)

  # If nothing changed, we're done
  return if external_value == last_read_value

  # The user has previously chosen to ignore the external changes to this attribute (using ignore!).
  # Therefore, we do not have to raise another exception
  # OR
  # We only read this attribute before.
  # If the user (/developer) previously placed a read integrity override
  # for the current external value, we don't have to re-raise an exception about the change
  return if read_integrity_override?(proxy, attribute: attribute, external_value: external_value)

  if attribute_changed?(proxy, attribute: attribute)
    # We read AND changed this attribute before

    # If there is already an active attribute change veto (meaning that we didn't change
    # the attribute again after the last one), we don't have to raise another exception about it.
    # TODO: This should have already been filtered out by #attribute_changed?
    # return if attribute_change_veto?(proxy, attribute: attribute)

    callcc do |continuation|
      exception = Petra::WriteClashError.new(attribute:      attribute,
                                             object:         proxy,
                                             our_value:      attribute_value(proxy, attribute: attribute),
                                             external_value: external_value,
                                             continuation:   continuation)

      fail exception, "The attribute `#{attribute}` has been changed externally and in the transaction."
    end
  else
    callcc do |continuation|
      exception = Petra::ReadIntegrityError.new(attribute:       attribute,
                                                object:          proxy,
                                                last_read_value: last_read_value,
                                                external_value:  external_value,
                                                continuation:    continuation)
      fail exception, "The attribute `#{attribute}` has been changed externally."
    end
  end
end