Class: Tus::Storage::S3

Inherits:
Object
  • Object
show all
Defined in:
lib/tus/storage/s3.rb

Defined Under Namespace

Classes: Response

Constant Summary collapse

MIN_PART_SIZE =

5MB is the minimum part size for S3 multipart uploads

5 * 1024 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket:, prefix: nil, upload_options: {}, thread_count: 10, **client_options) ⇒ S3

Returns a new instance of S3.



27
28
29
30
31
32
33
34
35
# File 'lib/tus/storage/s3.rb', line 27

def initialize(bucket:, prefix: nil, upload_options: {}, thread_count: 10, **client_options)
  resource = Aws::S3::Resource.new(**client_options)

  @client         = resource.client
  @bucket         = resource.bucket(bucket) or fail(ArgumentError, "the :bucket option was nil")
  @prefix         = prefix
  @upload_options = upload_options
  @thread_count   = thread_count
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



25
26
27
# File 'lib/tus/storage/s3.rb', line 25

def bucket
  @bucket
end

#clientObject (readonly)

Returns the value of attribute client.



25
26
27
# File 'lib/tus/storage/s3.rb', line 25

def client
  @client
end

#prefixObject (readonly)

Returns the value of attribute prefix.



25
26
27
# File 'lib/tus/storage/s3.rb', line 25

def prefix
  @prefix
end

#upload_optionsObject (readonly)

Returns the value of attribute upload_options.



25
26
27
# File 'lib/tus/storage/s3.rb', line 25

def upload_options
  @upload_options
end

Instance Method Details

#concatenate(uid, part_uids, info = {}) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/tus/storage/s3.rb', line 60

def concatenate(uid, part_uids, info = {})
  multipart_upload = create_file(uid, info)

  objects = part_uids.map { |part_uid| object(part_uid) }
  parts   = copy_parts(objects, multipart_upload)

  info["multipart_parts"].concat parts

  finalize_file(uid, info)

  delete(part_uids.flat_map { |part_uid| [object(part_uid), object("#{part_uid}.info")] })

  # Tus server requires us to return the size of the concatenated file.
  object = client.head_object(bucket: bucket.name, key: object(uid).key)
  object.content_length
rescue => error
  abort_multipart_upload(multipart_upload) if multipart_upload
  raise error
end

#create_file(uid, info = {}) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/tus/storage/s3.rb', line 37

def create_file(uid, info = {})
  tus_info = Tus::Info.new(info)

  options = upload_options.dup
  options[:content_type] = tus_info.["content_type"]

  if filename = tus_info.["filename"]
    # Aws-sdk-s3 doesn't sign non-ASCII characters correctly, and browsers
    # will automatically URI-decode filenames.
    filename = CGI.escape(filename).gsub("+", " ")

    options[:content_disposition] ||= "inline"
    options[:content_disposition]  += "; filename=\"#{filename}\""
  end

  multipart_upload = object(uid).initiate_multipart_upload(options)

  info["multipart_id"]    = multipart_upload.id
  info["multipart_parts"] = []

  multipart_upload
end

#delete_file(uid, info = {}) ⇒ Object



162
163
164
165
166
167
168
169
170
171
# File 'lib/tus/storage/s3.rb', line 162

def delete_file(uid, info = {})
  if info["multipart_id"]
    multipart_upload = object(uid).multipart_upload(info["multipart_id"])
    abort_multipart_upload(multipart_upload)

    delete [object("#{uid}.info")]
  else
    delete [object(uid), object("#{uid}.info")]
  end
end

#expire_files(expiration_date) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/tus/storage/s3.rb', line 173

def expire_files(expiration_date)
  old_objects = bucket.objects.select do |object|
    object.last_modified <= expiration_date
  end

  delete(old_objects)

  bucket.multipart_uploads.each do |multipart_upload|
    # no need to check multipart uploads initiated before expiration date
    next if multipart_upload.initiated > expiration_date

    most_recent_part = multipart_upload.parts.sort_by(&:last_modified).last
    if most_recent_part.nil? || most_recent_part.last_modified <= expiration_date
      abort_multipart_upload(multipart_upload)
    end
  end
end

#finalize_file(uid, info = {}) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/tus/storage/s3.rb', line 126

def finalize_file(uid, info = {})
  upload_id = info["multipart_id"]
  parts = info["multipart_parts"].map do |part|
    { part_number: part["part_number"], etag: part["etag"] }
  end

  multipart_upload = object(uid).multipart_upload(upload_id)
  multipart_upload.complete(multipart_upload: { parts: parts })

  info.delete("multipart_id")
  info.delete("multipart_parts")
end

#get_file(uid, info = {}, range: nil) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
# File 'lib/tus/storage/s3.rb', line 150

def get_file(uid, info = {}, range: nil)
  tus_info = Tus::Info.new(info)

  length = range ? range.size : tus_info.length
  range  = "bytes=#{range.begin}-#{range.end}" if range
  chunks = object(uid).enum_for(:get, range: range)

  # We return a response object that responds to #each, #length and #close,
  # which the tus server can return directly as the Rack response.
  Response.new(chunks: chunks, length: length)
end

#patch_file(uid, input, info = {}) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/tus/storage/s3.rb', line 80

def patch_file(uid, input, info = {})
  tus_info = Tus::Info.new(info)

  upload_id      = info["multipart_id"]
  part_offset    = info["multipart_parts"].count
  bytes_uploaded = 0

  jobs = []
  chunk = StringIO.new(input.read(MIN_PART_SIZE).to_s)

  loop do
    next_chunk = StringIO.new(input.read(MIN_PART_SIZE).to_s)

    # merge next chunk into previous if it's smaller than minimum chunk size
    if next_chunk.size < MIN_PART_SIZE
      chunk = StringIO.new(chunk.string + next_chunk.string)
      next_chunk.close
      next_chunk = nil
    end

    # abort if chunk is smaller than 5MB and is not the last chunk
    if chunk.size < MIN_PART_SIZE
      break if (tus_info.length && tus_info.offset) &&
               chunk.size + tus_info.offset < tus_info.length
    end

    thread = upload_part_thread(chunk, uid, upload_id, part_offset += 1)
    jobs << [thread, chunk]

    chunk = next_chunk or break
  end

  begin
    jobs.each do |thread, body|
      info["multipart_parts"] << thread.value
      bytes_uploaded += body.size
      body.close
    end
  rescue Seahorse::Client::NetworkingError => exception
    warn "ERROR: #{exception.inspect} occurred during upload"
    # ignore networking errors and return what client has uploaded so far
  end

  bytes_uploaded
end

#read_info(uid) ⇒ Object



139
140
141
142
143
144
# File 'lib/tus/storage/s3.rb', line 139

def read_info(uid)
  response = object("#{uid}.info").get
  JSON.parse(response.body.string)
rescue Aws::S3::Errors::NoSuchKey
  raise Tus::NotFound
end

#update_info(uid, info) ⇒ Object



146
147
148
# File 'lib/tus/storage/s3.rb', line 146

def update_info(uid, info)
  object("#{uid}.info").put(body: info.to_json)
end