Module: IntelligentUtils

Defined in:
lib/filestack/utils/utils.rb

Instance Method Summary collapse

Instance Method Details

#bad_state(state) ⇒ Boolean

Check if state is in error state or has reached maximum retries

Parameters:

Returns:

  • (Boolean)


179
180
181
# File 'lib/filestack/utils/utils.rb', line 179

def bad_state(state)
  !state.ok && state.alive?
end

#change_offset(working_offset, state) ⇒ Integer

Return current working offest if state has not tried it. Otherwise, return the next offset of the state

Parameters:

  • working_offset (Integer)

    The current offset

  • state (IntelligentState)

    An IntelligentState object

Returns:

  • (Integer)


191
192
193
194
195
196
197
# File 'lib/filestack/utils/utils.rb', line 191

def change_offset(working_offset, state)
  if state.offset > working_offset
    working_offset
  else
    state.offset = state.next_offset
  end
end

#chunk_job(job, state, apikey, filename, filesize, start_response, storage) ⇒ Dict

Chunk a specific job into offests

Parameters:

  • job (Dict)

    Dictionary with all job options

  • state (IntelligentState)

    An IntelligentState object

  • apikey (String)

    Filestack API key

  • filename (String)

    Name of incoming file

  • filepath (String)

    Local path to the file

  • filesize (Int)

    Size of incoming file

  • start_response (Typhoeus::Response)

    Response body from multipart_start

Returns:

  • (Dict)


281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/filestack/utils/utils.rb', line 281

def chunk_job(job, state, apikey, filename, filesize, start_response, storage)
  offset = 0
  seek_point = job[:seek_point]
  chunk_list = []

  while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize
    chunk_list.push(
      seek_point: seek_point,
      filename: filename,
      apikey: apikey,
      part: job[:part],
      size: job[:size],
      uri: start_response['uri'],
      region: start_response['region'],
      upload_id: start_response['upload_id'],
      location_url: start_response['location_url'],
      store: { location: storage },
      offset: offset
    )
    offset += state.offset
  end
  chunk_list
end

#create_intelligent_generator(jobs) ⇒ Fiber

Creates a generator of part jobs

Parameters:

  • jobs (Array)

    A list of file parts

Returns:

  • (Fiber)


238
239
240
241
242
243
244
245
246
# File 'lib/filestack/utils/utils.rb', line 238

def create_intelligent_generator(jobs)
  jobs_gen = jobs.lazy.each
  Fiber.new do
    (jobs.length-1).times do
      Fiber.yield jobs_gen.next
    end
    jobs_gen.next
  end
end

#create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response) ⇒ Array

Loop and run chunks for each offset

Parameters:

  • jobs (Array)

    A list of file parts

  • state (IntelligentState)

    An IntelligentState object

  • apikey (String)

    Filestack API key

  • filename (String)

    Name of incoming file

  • filepath (String)

    Local path to the file

  • filesize (Int)

    Size of incoming file

  • start_response (Typhoeus::Response)

    Response body from multipart_start

Returns:

  • (Array)


260
261
262
263
264
265
266
267
# File 'lib/filestack/utils/utils.rb', line 260

def create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response)
  jobs.each { |job|
    job[:chunks] = chunk_job(
      job, state, apikey, filename, filepath, filesize, start_response
    )
  }
  jobs
end

#get_generator_batch(generator) ⇒ Array

Generates a batch given a Fiber

Parameters:

  • generator (Fiber)

    A living Fiber object

Returns:

  • (Array)


165
166
167
168
169
170
171
# File 'lib/filestack/utils/utils.rb', line 165

def get_generator_batch(generator)
  batch = []
  4.times do
    batch.push(generator.resume) if generator.alive?
  end
  return batch
end

#run_intelligent_upload_flow(jobs, filepath, io, state, storage) ⇒ Array

Runs the intelligent upload flow, from start to finish

Parameters:

  • jobs (Array)

    A list of file parts

  • state (IntelligentState)

    An IntelligentState object

Returns:

  • (Array)


205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/filestack/utils/utils.rb', line 205

def run_intelligent_upload_flow(jobs, filepath, io, state, storage)
  bar = ProgressBar.new(jobs.length)
  generator = create_intelligent_generator(jobs)
  working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE
  while generator.alive?
    batch = get_generator_batch(generator)
    # run parts
    Parallel.map(batch, in_threads: 4) do |part|
      state = run_intelligent_uploads(part, filepath, io, state, storage)
      # condition: a chunk has failed but we have not reached the maximum retries
      while bad_state(state)
        # condition: timeout to S3, requiring offset size to be changed
        if state.error_type == 'S3_NETWORK'
          sleep(5)
          state.offset = working_offset = change_offset(working_offset, state)
        # condition: timeout to backend, requiring only backoff
        elsif ['S3_SERVER', 'BACKEND_SERVER'].include? state.error_type
          sleep(state.backoff)
        end
        state.add_retry
        state = run_intelligent_uploads(part, filepath, io, state, storage)
      end
      raise "Upload has failed. Please try again later." unless state.ok
      bar.increment!
    end
  end
end

#run_intelligent_uploads(part, filepath, io, state, storage) ⇒ IntelligentState

Send a job’s chunks in parallel and commit

Parameters:

  • part (Dict)

    A dictionary representing the information for a single part

  • state (IntelligentState)

    An IntelligentState object

Returns:



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/filestack/utils/utils.rb', line 312

def run_intelligent_uploads(part, filepath, io, state, storage)
  failed = false
  chunks = chunk_job(
    part, state, part[:apikey], part[:filename], part[:filesize], part[:start_response], storage
  )
  Parallel.map(chunks, in_threads: 3) do |chunk|
    begin
      upload_chunk_intelligently(chunk, state, part[:apikey], filepath, io, part[:options], storage)
    rescue StandardError => e
      state.error_type = e.message
      failed = true
      Parallel::Kill
    end
  end

  if failed
    state.ok = false
    return state
  else
    state.ok = true
  end

  commit_params = {
    apikey: part[:apikey],
    uri: part[:uri],
    region: part[:region],
    upload_id: part[:upload_id],
    size: part[:filesize],
    part: part[:part],
    location_url: part[:start_response]['location_url'],
    store: { location: storage }
  }

  response = Typhoeus.post(FilestackConfig.multipart_commit_url(commit_params[:location_url]),
                           body: commit_params.to_json,
                           headers: FilestackConfig::HEADERS)

  if response.code == 200
    state.reset
  else
    state.ok = false
  end
  state
end

#upload_chunk_intelligently(job, state, apikey, filepath, io, options, storage) ⇒ Typhoeus::Response

Upload a single chunk

Parameters:

  • job (Dict)

    Dictionary with all job options

  • state (IntelligentState)

    An IntelligentState object

  • apikey (String)

    Filestack API key

  • filename (String)

    Name of incoming file

  • filepath (String)

    Local path to the file

  • options (Hash)

    User-defined options for multipart uploads

Returns:

  • (Typhoeus::Response)


368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/filestack/utils/utils.rb', line 368

def upload_chunk_intelligently(job, state, apikey, filepath, io, options, storage)
  file = filepath ? File.open(filepath) : io
  file.seek(job[:seek_point] + job[:offset])

  chunk = file.read(state.offset)
  md5 = Digest::MD5.new
  md5 << chunk
  data = {
    apikey: apikey,
    part: job[:part],
    size: chunk.length,
    md5: md5.base64digest,
    uri: job[:uri],
    region: job[:region],
    upload_id: job[:upload_id],
    store: { location: storage },
    offset: job[:offset],
    fii: true
  }

  data = data.merge!(options) if options

  fs_response = Typhoeus.post(FilestackConfig.multipart_upload_url(job[:location_url]),
                              body: data.to_json,
                              headers: FilestackConfig::HEADERS)

  # POST to multipart/upload
  begin
    unless fs_response.code == 200
      if [400, 403, 404].include? fs_response.code
        raise 'FAILURE'
      else
        raise 'BACKEND_SERVER'
      end
    end

  rescue StandardError
    raise 'BACKEND_NETWORK'
  end
  fs_response = JSON.parse(fs_response.body)

  # PUT to S3
  begin
    amazon_response = Typhoeus.put(
      fs_response['url'], headers: fs_response['headers'], body: chunk
    )
    unless amazon_response.code == 200
      if [400, 403, 404].include? amazon_response.code
        raise 'FAILURE'
      else
        raise 'S3_SERVER'
      end
    end

  rescue StandardError
    raise 'S3_NETWORK'
  end
  amazon_response
end