Class: Tus::Storage::S3

Inherits:
Object
  • Object
show all
Defined in:
lib/tus/storage/s3.rb

Defined Under Namespace

Classes: Response

Constant Summary collapse

MIN_PART_SIZE =

5MB is the minimum part size for S3 multipart uploads

5 * 1024 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket:, prefix: nil, upload_options: {}, thread_count: 10, **client_options) ⇒ S3

Returns a new instance of S3.



21
22
23
24
25
26
27
28
29
# File 'lib/tus/storage/s3.rb', line 21

def initialize(bucket:, prefix: nil, upload_options: {}, thread_count: 10, **client_options)
  resource = Aws::S3::Resource.new(**client_options)

  @client         = resource.client
  @bucket         = resource.bucket(bucket) or fail(ArgumentError, "the :bucket option was nil")
  @prefix         = prefix
  @upload_options = upload_options
  @thread_count   = thread_count
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



19
20
21
# File 'lib/tus/storage/s3.rb', line 19

def bucket
  @bucket
end

#clientObject (readonly)

Returns the value of attribute client.



19
20
21
# File 'lib/tus/storage/s3.rb', line 19

def client
  @client
end

#prefixObject (readonly)

Returns the value of attribute prefix.



19
20
21
# File 'lib/tus/storage/s3.rb', line 19

def prefix
  @prefix
end

#upload_optionsObject (readonly)

Returns the value of attribute upload_options.



19
20
21
# File 'lib/tus/storage/s3.rb', line 19

def upload_options
  @upload_options
end

Instance Method Details

#concatenate(uid, part_uids, info = {}) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/tus/storage/s3.rb', line 54

def concatenate(uid, part_uids, info = {})
  multipart_upload = create_file(uid, info)

  objects = part_uids.map { |part_uid| object(part_uid) }
  parts   = copy_parts(objects, multipart_upload)

  info["multipart_parts"].concat parts

  finalize_file(uid, info)

  delete(part_uids.flat_map { |part_uid| [object(part_uid), object("#{part_uid}.info")] })

  # Tus server requires us to return the size of the concatenated file.
  object = client.head_object(bucket: bucket.name, key: object(uid).key)
  object.content_length
rescue => error
  abort_multipart_upload(multipart_upload) if multipart_upload
  raise error
end

#create_file(uid, info = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/tus/storage/s3.rb', line 31

def create_file(uid, info = {})
  tus_info = Tus::Info.new(info)

  options = upload_options.dup
  options[:content_type] = tus_info.["content_type"]

  if filename = tus_info.["filename"]
    # Aws-sdk doesn't sign non-ASCII characters correctly, and browsers
    # will automatically URI-decode filenames.
    filename = CGI.escape(filename).gsub("+", " ")

    options[:content_disposition] ||= "inline"
    options[:content_disposition]  += "; filename=\"#{filename}\""
  end

  multipart_upload = object(uid).initiate_multipart_upload(options)

  info["multipart_id"]    = multipart_upload.id
  info["multipart_parts"] = []

  multipart_upload
end

#delete_file(uid, info = {}) ⇒ Object



156
157
158
159
160
161
162
163
164
165
# File 'lib/tus/storage/s3.rb', line 156

def delete_file(uid, info = {})
  if info["multipart_id"]
    multipart_upload = object(uid).multipart_upload(info["multipart_id"])
    abort_multipart_upload(multipart_upload)

    delete [object("#{uid}.info")]
  else
    delete [object(uid), object("#{uid}.info")]
  end
end

#expire_files(expiration_date) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/tus/storage/s3.rb', line 167

def expire_files(expiration_date)
  old_objects = bucket.objects.select do |object|
    object.last_modified <= expiration_date
  end

  delete(old_objects)

  bucket.multipart_uploads.each do |multipart_upload|
    # no need to check multipart uploads initiated before expiration date
    next if multipart_upload.initiated > expiration_date

    most_recent_part = multipart_upload.parts.sort_by(&:last_modified).last
    if most_recent_part.nil? || most_recent_part.last_modified <= expiration_date
      abort_multipart_upload(multipart_upload)
    end
  end
end

#finalize_file(uid, info = {}) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/tus/storage/s3.rb', line 120

def finalize_file(uid, info = {})
  upload_id = info["multipart_id"]
  parts = info["multipart_parts"].map do |part|
    { part_number: part["part_number"], etag: part["etag"] }
  end

  multipart_upload = object(uid).multipart_upload(upload_id)
  multipart_upload.complete(multipart_upload: { parts: parts })

  info.delete("multipart_id")
  info.delete("multipart_parts")
end

#get_file(uid, info = {}, range: nil) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/tus/storage/s3.rb', line 144

def get_file(uid, info = {}, range: nil)
  tus_info = Tus::Info.new(info)

  length = range ? range.size : tus_info.length
  range  = "bytes=#{range.begin}-#{range.end}" if range
  chunks = object(uid).enum_for(:get, range: range)

  # We return a response object that responds to #each, #length and #close,
  # which the tus server can return directly as the Rack response.
  Response.new(chunks: chunks, length: length)
end

#patch_file(uid, input, info = {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/tus/storage/s3.rb', line 74

def patch_file(uid, input, info = {})
  tus_info = Tus::Info.new(info)

  upload_id      = info["multipart_id"]
  part_offset    = info["multipart_parts"].count
  bytes_uploaded = 0

  jobs = []
  chunk = StringIO.new(input.read(MIN_PART_SIZE).to_s)

  loop do
    next_chunk = StringIO.new(input.read(MIN_PART_SIZE).to_s)

    # merge next chunk into previous if it's smaller than minimum chunk size
    if next_chunk.size < MIN_PART_SIZE
      chunk = StringIO.new(chunk.string + next_chunk.string)
      next_chunk.close
      next_chunk = nil
    end

    # abort if chunk is smaller than 5MB and is not the last chunk
    if chunk.size < MIN_PART_SIZE
      break if (tus_info.length && tus_info.offset) &&
               chunk.size + tus_info.offset < tus_info.length
    end

    thread = upload_part_thread(chunk, uid, upload_id, part_offset += 1)
    jobs << [thread, chunk]

    chunk = next_chunk or break
  end

  begin
    jobs.each do |thread, body|
      info["multipart_parts"] << thread.value
      bytes_uploaded += body.size
      body.close
    end
  rescue Seahorse::Client::NetworkingError => exception
    warn "ERROR: #{exception.inspect} occurred during upload"
    # ignore networking errors and return what client has uploaded so far
  end

  bytes_uploaded
end

#read_info(uid) ⇒ Object



133
134
135
136
137
138
# File 'lib/tus/storage/s3.rb', line 133

def read_info(uid)
  response = object("#{uid}.info").get
  JSON.parse(response.body.string)
rescue Aws::S3::Errors::NoSuchKey
  raise Tus::NotFound
end

#update_info(uid, info) ⇒ Object



140
141
142
# File 'lib/tus/storage/s3.rb', line 140

def update_info(uid, info)
  object("#{uid}.info").put(body: info.to_json)
end