Class: Kithe::Asset::DerivativeUpdater

Inherits:
Object
  • Object
show all
Defined in:
app/models/kithe/asset/derivative_updater.rb

Overview

A service object to add an IO stream as a derivative with a certain key, to a asset. Adds a Derivative database object for such. This class is normally only used from Asset#update_derivative, it’s a helper object, you aren’t expected to use it independently.

This would be very straightforward if it weren’t for taking account of a couple concurrency race conditions involving data integrity:

  1. There should be only one derivative for a given asset/key pair. This is enforced by

a DB constraint. If the record already exists, we want to update the current record,
otherwise add a new one. We want to do this in a race-condition safe way, with possibly
multiple processes editing db.
  1. The DB should at no point in time contain a derivative generated for an old version

of the asset. If an asset#file is changed, it's existing derivatives need to be deleted,
and this needs to happen in a race-condition safe way when something may be trying to
add a derivative concurrently.

I believe we have solved those challenges, but it leads to a bit tricky code. We use a kind of “optimistic” approach to (1) (try to insert, if you get a uniqueness violation try to find and use the record that’s already there). And a “pessimistic” approach to (2), where we actually briefly take out a DB pessimistic lock to make sure the asset#file hasn’t changed, and can’t until we’re done updating. (using sha512 as a marker, which is why you can’t add an asset until it has a sha512 in it’s metadata, usually post-promotion).

If we made a given Asset objects’s file bytestream immutable, this would all be a lot simpler; we wouldn’t need to worry about (2) at all, and maybe not even (1). We might consider that, but for now we’re tackling the hard way.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(asset, key, io, storage_key: :kithe_derivatives, metadata: {}) ⇒ DerivativeUpdater

Returns a new instance of DerivativeUpdater.



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'app/models/kithe/asset/derivative_updater.rb', line 32

def initialize(asset, key, io, storage_key: :kithe_derivatives, metadata: {})
  @asset = asset
  @key = key
  @io = io
  @storage_key = storage_key
  @metadata = 

  @max_optimistic_tries = 3

  unless asset_has_persisted_sha512?
    raise ArgumentError.new("Can not safely add derivative to an asset without a persisted sha512 value")
  end
end

Instance Attribute Details

#assetObject (readonly)

Returns the value of attribute asset.



30
31
32
# File 'app/models/kithe/asset/derivative_updater.rb', line 30

def asset
  @asset
end

#ioObject (readonly)

Returns the value of attribute io.



30
31
32
# File 'app/models/kithe/asset/derivative_updater.rb', line 30

def io
  @io
end

#keyObject (readonly)

Returns the value of attribute key.



30
31
32
# File 'app/models/kithe/asset/derivative_updater.rb', line 30

def key
  @key
end

#max_optimistic_triesObject (readonly)

Returns the value of attribute max_optimistic_tries.



30
31
32
# File 'app/models/kithe/asset/derivative_updater.rb', line 30

def max_optimistic_tries
  @max_optimistic_tries
end

#metadataObject (readonly)

Returns the value of attribute metadata.



30
31
32
# File 'app/models/kithe/asset/derivative_updater.rb', line 30

def 
  @metadata
end

#storage_keyObject (readonly)

Returns the value of attribute storage_key.



30
31
32
# File 'app/models/kithe/asset/derivative_updater.rb', line 30

def storage_key
  @storage_key
end

Instance Method Details

#asset_has_persisted_sha512?Boolean

Returns:

  • (Boolean)


114
115
116
117
118
119
# File 'app/models/kithe/asset/derivative_updater.rb', line 114

def asset_has_persisted_sha512?
  asset.persisted? && asset.sha512.present? &&
  !( asset.file_data_changed? &&
       asset.file_data_change.first.try(:dig, "metadata", "sha512") !=
         asset.file_data_change.second.try(:dig, "metadata", "sha512"))
end

#optimistically_save_derivative(uploaded_file:, derivative:, tries: 0) ⇒ Object

Attaches UploadedFile to Derivative and tries to save it – if we get a unique constraint violation because a Derivative for that asset/key already existed, we fetch that alredy existing one from the db and update it’s actual bytestream.

This method calls itself recursively to do that. Gives up after max_optimistic_tries, at which point it’ll just raise the constraint violation exception.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'app/models/kithe/asset/derivative_updater.rb', line 65

def optimistically_save_derivative(uploaded_file:, derivative:, tries: 0)
  derivative.file_attacher.set(uploaded_file)
  save_deriv_ensuring_unchanged_asset(derivative)
rescue ActiveRecord::RecordNotUnique => e
  if tries < max_optimistic_tries
    # find the one that's already there, try to attach our new file
    # to that one
    derivative = Kithe::Derivative.where(key: key.to_s, asset: asset).first || derivative
    optimistically_save_derivative(uploaded_file: uploaded_file, derivative: derivative, tries: tries + 1)
  else
    uploaded_file.delete if uploaded_file
    raise e
  end
rescue StandardError
  # aggressively clean up our file on errors!
  uploaded_file.delete if uploaded_file
  raise e
end

#save_deriv_ensuring_unchanged_asset(deriv) ⇒ Object

Save a Derivative model with some fancy DB footwork to ensure at the time we save it, the original asset file it is based on is still in db unchanged, in a concurrency-safe way.

We re-fetch to ensure asset still exists, with sha512 we expect. (kithe model ensures sha512 exists in shrine metadata). With a pessmistic lock in a transaction. This ensures that at the point we save the new derivative, the db is still in a state where the original file the derivative relates to is still in the db.

Can raise a ActiveRecord::RecordNotUnique, if derivative unique constraint is violated, that is handled above here.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'app/models/kithe/asset/derivative_updater.rb', line 95

def save_deriv_ensuring_unchanged_asset(deriv)
  # fancy throw/catch keep our abort rescue from being in the transaction
  catch(:kithe_unchanged_abort) do
    Kithe::Asset.transaction do
      # the file we're trying to add a derivative to doesn't exist anymore, forget it
      unless asset.acquire_lock_on_sha
        throw :kithe_unchanged_abort
      end

      deriv.save!
      return deriv
    end
  end

  # If we made it here, we've aborted
  deriv.file.delete
  return nil
end

#updateObject



46
47
48
49
50
51
52
53
54
55
56
57
# File 'app/models/kithe/asset/derivative_updater.rb', line 46

def update
  deriv = Kithe::Derivative.new(key: key.to_s, asset: asset)

  # skip cache phase, right to specified storage, but with metadata extraction.
  uploader = deriv.file_attacher.shrine_class.new(storage_key)

  # add our derivative key to context when uploading, so Kithe::DerivativeUploader can
  # use it if needed.
  uploaded_file = uploader.upload(io, record: deriv, metadata: .merge(kithe_derivative_key: key))

  optimistically_save_derivative(uploaded_file: uploaded_file, derivative: deriv)
end