Class: Tus::Storage::Gridfs

Inherits:
Object
  • Object
show all
Defined in:
lib/tus/storage/gridfs.rb

Defined Under Namespace

Classes: Response

Constant Summary collapse

BATCH_SIZE =
5 * 1024 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, prefix: "fs", chunk_size: 256*1024) ⇒ Gridfs

Returns a new instance of Gridfs.



16
17
18
19
20
21
22
23
# File 'lib/tus/storage/gridfs.rb', line 16

def initialize(client:, prefix: "fs", chunk_size: 256*1024)
  @client     = client
  @prefix     = prefix
  @chunk_size = chunk_size
  @bucket     = client.database.fs(bucket_name: prefix)

  @bucket.send(:ensure_indexes!)
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



14
15
16
# File 'lib/tus/storage/gridfs.rb', line 14

def bucket
  @bucket
end

#chunk_sizeObject (readonly)

Returns the value of attribute chunk_size.



14
15
16
# File 'lib/tus/storage/gridfs.rb', line 14

def chunk_size
  @chunk_size
end

#clientObject (readonly)

Returns the value of attribute client.



14
15
16
# File 'lib/tus/storage/gridfs.rb', line 14

def client
  @client
end

#prefixObject (readonly)

Returns the value of attribute prefix.



14
15
16
# File 'lib/tus/storage/gridfs.rb', line 14

def prefix
  @prefix
end

Instance Method Details

#concatenate(uid, part_uids, info = {}) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/tus/storage/gridfs.rb', line 34

def concatenate(uid, part_uids, info = {})
  grid_infos = files_collection.find(filename: {"$in" => part_uids}).to_a
  grid_infos.sort_by! { |grid_info| part_uids.index(grid_info[:filename]) }

  validate_parts!(grid_infos, part_uids)

  length       = grid_infos.map { |doc| doc[:length] }.reduce(0, :+)
  content_type = Tus::Info.new(info).["content_type"]

  grid_file = create_grid_file(
    filename:     uid,
    length:       length,
    content_type: content_type,
  )

  # Update the chunks belonging to parts so that they point to the new file.
  grid_infos.inject(0) do |offset, grid_info|
    result = chunks_collection
      .find(files_id: grid_info[:_id])
      .update_many(
        "$set" => { files_id: grid_file.id },
        "$inc" => { n: offset },
      )

    offset += result.modified_count
  end

  # Delete the parts after concatenation.
  files_collection.delete_many(filename: {"$in" => part_uids})

  # Tus server requires us to return the size of the concatenated file.
  length
end

#create_file(uid, info = {}) ⇒ Object



25
26
27
28
29
30
31
32
# File 'lib/tus/storage/gridfs.rb', line 25

def create_file(uid, info = {})
  content_type = Tus::Info.new(info).["content_type"]

  create_grid_file(
    filename:     uid,
    content_type: content_type,
  )
end

#delete_file(uid, info = {}) ⇒ Object



174
175
176
177
# File 'lib/tus/storage/gridfs.rb', line 174

def delete_file(uid, info = {})
  grid_info = files_collection.find(filename: uid).first
  bucket.delete(grid_info[:_id]) if grid_info
end

#expire_files(expiration_date) ⇒ Object



179
180
181
182
183
184
185
# File 'lib/tus/storage/gridfs.rb', line 179

def expire_files(expiration_date)
  grid_infos = files_collection.find(uploadDate: {"$lte" => expiration_date}).to_a
  grid_info_ids = grid_infos.map { |info| info[:_id] }

  files_collection.delete_many(_id: {"$in" => grid_info_ids})
  chunks_collection.delete_many(files_id: {"$in" => grid_info_ids})
end

#get_file(uid, info = {}, range: nil) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/tus/storage/gridfs.rb', line 122

def get_file(uid, info = {}, range: nil)
  grid_info = files_collection.find(filename: uid).first

  length = range ? range.size : grid_info[:length]

  filter = { files_id: grid_info[:_id] }

  if range
    chunk_start = range.begin / grid_info[:chunkSize]
    chunk_stop  = range.end   / grid_info[:chunkSize]

    filter[:n] = {"$gte" => chunk_start, "$lte" => chunk_stop}
  end

  # Query only the subset of chunks specified by the range query. We
  # cannot use Mongo::FsBucket#open_download_stream here because it
  # doesn't support changing the filter.
  chunks_view = chunks_collection.find(filter).sort(n: 1)

  # Create an Enumerator which will yield chunks of the requested file
  # content, allowing tus server to efficiently stream requested content
  # to the client.
  chunks = Enumerator.new do |yielder|
    chunks_view.each do |document|
      data = document[:data].data

      if document[:n] == chunk_start && document[:n] == chunk_stop
        byte_start = range.begin % grid_info[:chunkSize]
        byte_stop  = range.end   % grid_info[:chunkSize]
      elsif document[:n] == chunk_start
        byte_start = range.begin % grid_info[:chunkSize]
        byte_stop  = grid_info[:chunkSize] - 1
      elsif document[:n] == chunk_stop
        byte_start = 0
        byte_stop  = range.end % grid_info[:chunkSize]
      end

      # If we're on the first or last chunk, return a subset of the chunk
      # specified by the given range, otherwise return the full chunk.
      if byte_start && byte_stop
        yielder << data[byte_start..byte_stop]
      else
        yielder << data
      end
    end
  end

  # We return a response object that responds to #each, #length and #close,
  # which the tus server can return directly as the Rack response.
  Response.new(chunks: chunks, length: length, close: ->{chunks_view.close_query})
end

#patch_file(uid, input, info = {}) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/tus/storage/gridfs.rb', line 68

def patch_file(uid, input, info = {})
  grid_info      = files_collection.find(filename: uid).first
  current_length = grid_info[:length]
  chunk_size     = grid_info[:chunkSize]
  bytes_saved    = 0

  bytes_saved += patch_last_chunk(input, grid_info) if current_length % chunk_size != 0

  chunks_enumerator = Enumerator.new do |yielder|
    while (data = input.read(chunk_size))
      yielder << data
    end
  end

  chunks_in_batch = (BATCH_SIZE.to_f / chunk_size).ceil
  chunks_offset   = chunks_collection.count(files_id: grid_info[:_id]) - 1

  chunks_enumerator.each_slice(chunks_in_batch) do |chunks|
    grid_chunks = chunks.map do |data|
      Mongo::Grid::File::Chunk.new(
        data: BSON::Binary.new(data),
        files_id: grid_info[:_id],
        n: chunks_offset += 1,
      )
    end

    chunks_collection.insert_many(grid_chunks)

    # Update the total length and refresh the upload date on each update,
    # which are used in #get_file, #concatenate and #expire_files.
    files_collection.find(filename: uid).update_one(
      "$inc" => { length: chunks.map(&:bytesize).inject(0, :+) },
      "$set" => { uploadDate: Time.now.utc },
    )
    bytes_saved += chunks.map(&:bytesize).inject(0, :+)

    chunks.each(&:clear) # deallocate strings
  end

  bytes_saved
end

#read_info(uid) ⇒ Object



110
111
112
113
114
# File 'lib/tus/storage/gridfs.rb', line 110

def read_info(uid)
  grid_info = files_collection.find(filename: uid).first or raise Tus::NotFound

  grid_info[:metadata]
end

#update_info(uid, info) ⇒ Object



116
117
118
119
120
# File 'lib/tus/storage/gridfs.rb', line 116

def update_info(uid, info)
  grid_info = files_collection.find(filename: uid).first

  files_collection.update_one({filename: uid}, {"$set" => {metadata: info}})
end