Class: Tus::Storage::Gridfs

Inherits:
Object
  • Object
show all
Defined in:
lib/tus/storage/gridfs.rb

Constant Summary collapse

BATCH_SIZE =
5 * 1024 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, prefix: "fs", chunk_size: 256*1024) ⇒ Gridfs

Initializes the GridFS storage and creates necessary indexes.



21
22
23
24
25
26
27
28
# File 'lib/tus/storage/gridfs.rb', line 21

def initialize(client:, prefix: "fs", chunk_size: 256*1024)
  @client     = client
  @prefix     = prefix
  @chunk_size = chunk_size
  @bucket     = client.database.fs(bucket_name: prefix)

  @bucket.send(:ensure_indexes!)
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



18
19
20
# File 'lib/tus/storage/gridfs.rb', line 18

def bucket
  @bucket
end

#chunk_sizeObject (readonly)

Returns the value of attribute chunk_size.



18
19
20
# File 'lib/tus/storage/gridfs.rb', line 18

def chunk_size
  @chunk_size
end

#clientObject (readonly)

Returns the value of attribute client.



18
19
20
# File 'lib/tus/storage/gridfs.rb', line 18

def client
  @client
end

#prefixObject (readonly)

Returns the value of attribute prefix.



18
19
20
# File 'lib/tus/storage/gridfs.rb', line 18

def prefix
  @prefix
end

Instance Method Details

#concatenate(uid, part_uids, info = {}) ⇒ Object

Concatenates multiple partial uploads into a single upload, and returns the size of the resulting upload. The partial uploads are deleted after concatenation.

It concatenates by updating partial upload’s GridFS chunks to point to the new upload.

Raises Tus::Error if GridFS chunks of partial uploads don’t exist or aren’t completely filled.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/tus/storage/gridfs.rb', line 49

def concatenate(uid, part_uids, info = {})
  grid_infos = files_collection.find(filename: {"$in" => part_uids}).to_a
  grid_infos.sort_by! { |grid_info| part_uids.index(grid_info[:filename]) }

  validate_parts!(grid_infos, part_uids)

  length       = grid_infos.map { |doc| doc[:length] }.reduce(0, :+)
  content_type = Tus::Info.new(info).["content_type"]

  grid_file = create_grid_file(
    filename:     uid,
    length:       length,
    content_type: content_type,
  )

  # Update the chunks belonging to parts so that they point to the new file.
  grid_infos.inject(0) do |offset, grid_info|
    result = chunks_collection
      .find(files_id: grid_info[:_id])
      .update_many(
        "$set" => { files_id: grid_file.id },
        "$inc" => { n: offset },
      )

    offset += result.modified_count
  end

  # Delete the parts after concatenation.
  files_collection.delete_many(filename: {"$in" => part_uids})

  # Tus server requires us to return the size of the concatenated file.
  length
end

#create_file(uid, info = {}) ⇒ Object

Creates a file for the specified upload.



31
32
33
34
35
36
37
38
# File 'lib/tus/storage/gridfs.rb', line 31

def create_file(uid, info = {})
  content_type = Tus::Info.new(info).["content_type"]

  create_grid_file(
    filename:     uid,
    content_type: content_type,
  )
end

#delete_file(uid, info = {}) ⇒ Object

Deletes the GridFS file and chunks for the specified upload.



203
204
205
206
# File 'lib/tus/storage/gridfs.rb', line 203

def delete_file(uid, info = {})
  grid_info = files_collection.find(filename: uid).first
  bucket.delete(grid_info[:_id]) if grid_info
end

#expire_files(expiration_date) ⇒ Object

Deletes GridFS file and chunks of uploads older than the specified date.



209
210
211
212
213
214
215
# File 'lib/tus/storage/gridfs.rb', line 209

def expire_files(expiration_date)
  grid_infos = files_collection.find(uploadDate: {"$lte" => expiration_date}).to_a
  grid_info_ids = grid_infos.map { |info| info[:_id] }

  files_collection.delete_many(_id: {"$in" => grid_info_ids})
  chunks_collection.delete_many(files_id: {"$in" => grid_info_ids})
end

#get_file(uid, info = {}, range: nil) ⇒ Object

Returns a Tus::Response object through which data of the specified upload can be retrieved in a streaming fashion. Accepts an optional range parameter for selecting a subset of bytes we want to retrieve.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/tus/storage/gridfs.rb', line 154

def get_file(uid, info = {}, range: nil)
  grid_info = files_collection.find(filename: uid).first

  filter = { files_id: grid_info[:_id] }

  if range
    chunk_start = range.begin / grid_info[:chunkSize]
    chunk_stop  = range.end   / grid_info[:chunkSize]

    filter[:n] = {"$gte" => chunk_start, "$lte" => chunk_stop}
  end

  # Query only the subset of chunks specified by the range query. We
  # cannot use Mongo::FsBucket#open_download_stream here because it
  # doesn't support changing the filter.
  chunks_view = chunks_collection.find(filter).sort(n: 1)

  # Create an Enumerator which will yield chunks of the requested file
  # content, allowing tus server to efficiently stream requested content
  # to the client.
  chunks = Enumerator.new do |yielder|
    chunks_view.each do |document|
      data = document[:data].data

      if document[:n] == chunk_start && document[:n] == chunk_stop
        byte_start = range.begin % grid_info[:chunkSize]
        byte_stop  = range.end   % grid_info[:chunkSize]
      elsif document[:n] == chunk_start
        byte_start = range.begin % grid_info[:chunkSize]
        byte_stop  = grid_info[:chunkSize] - 1
      elsif document[:n] == chunk_stop
        byte_start = 0
        byte_stop  = range.end % grid_info[:chunkSize]
      end

      # If we're on the first or last chunk, return a subset of the chunk
      # specified by the given range, otherwise return the full chunk.
      if byte_start && byte_stop
        yielder << data[byte_start..byte_stop]
      else
        yielder << data
      end
    end
  end

  Tus::Response.new(chunks: chunks, close: chunks_view.method(:close_query))
end

#patch_file(uid, input, info = {}) ⇒ Object

Appends data to the specified upload in a streaming fashion, and returns the number of bytes it managed to save.

It does so by reading the input data in batches of chunks, creating a new GridFS chunk for each chunk of data and appending it to the existing list.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/tus/storage/gridfs.rb', line 89

def patch_file(uid, input, info = {})
  grid_info      = files_collection.find(filename: uid).first
  current_length = grid_info[:length]
  chunk_size     = grid_info[:chunkSize]
  bytes_saved    = 0

  # It's possible that the previous data append didn't fill in the last
  # GridFS chunk completely, so we fill in that gap now before creating
  # new GridFS chunks.
  bytes_saved += patch_last_chunk(input, grid_info) if current_length % chunk_size != 0

  # Create an Enumerator which yields chunks of input data which have the
  # size of the configured :chunkSize of the GridFS file.
  chunks_enumerator = Enumerator.new do |yielder|
    while (data = input.read(chunk_size))
      yielder << data
    end
  end

  chunks_in_batch = (BATCH_SIZE.to_f / chunk_size).ceil
  chunks_offset   = chunks_collection.count(files_id: grid_info[:_id]) - 1

  # Iterate in batches of data chunks and bulk-insert new GridFS chunks.
  # This way we try to have a balance between bulking inserts and keeping
  # memory usage low.
  chunks_enumerator.each_slice(chunks_in_batch) do |chunks|
    grid_chunks = chunks.map do |data|
      Mongo::Grid::File::Chunk.new(
        data: BSON::Binary.new(data),
        files_id: grid_info[:_id],
        n: chunks_offset += 1,
      )
    end

    chunks_collection.insert_many(grid_chunks)

    # Update the total length and refresh the upload date on each update,
    # which are used in #get_file, #concatenate and #expire_files.
    files_collection.find(filename: uid).update_one(
      "$inc" => { length: chunks.map(&:bytesize).inject(0, :+) },
      "$set" => { uploadDate: Time.now.utc },
    )
    bytes_saved += chunks.map(&:bytesize).inject(0, :+)

    chunks.each(&:clear) # deallocate strings
  end

  bytes_saved
end

#read_info(uid) ⇒ Object

Returns info of the specified upload. Raises Tus::NotFound if the upload wasn’t found.



141
142
143
144
# File 'lib/tus/storage/gridfs.rb', line 141

def read_info(uid)
  grid_info = files_collection.find(filename: uid).first or raise Tus::NotFound
  grid_info[:metadata]
end

#update_info(uid, info) ⇒ Object

Updates info of the specified upload.



147
148
149
# File 'lib/tus/storage/gridfs.rb', line 147

def update_info(uid, info)
  files_collection.update_one({filename: uid}, {"$set" => {metadata: info}})
end