Class: Aliyun::OSS::Multipart::Upload

Inherits:
Transaction show all
Includes:
Common::Logging
Defined in:
lib/aliyun/oss/upload.rb

Overview

A multipart upload transaction

Constant Summary collapse

PART_SIZE =
10 * 1024 * 1024
READ_SIZE =
16 * 1024
NUM_THREAD =
10

Constants included from Common::Logging

Common::Logging::MAX_NUM_LOG, Common::Logging::ROTATE_SIZE

Instance Method Summary collapse

Methods included from Common::Logging

#logger, set_log_file, set_log_level

Methods inherited from Common::Struct::Base

#to_s

Methods included from Common::Struct::Base::AttrHelper

#attrs

Constructor Details

#initialize(protocol, opts) ⇒ Upload

Returns a new instance of Upload.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/aliyun/oss/upload.rb', line 17

def initialize(protocol, opts)
  args = opts.dup
  @protocol = protocol
  @progress = args.delete(:progress)
  @file = args.delete(:file)
  @cpt_file = args.delete(:cpt_file)
  super(args)

  @file_meta = {}
  @num_threads = options[:threads] || NUM_THREAD
  @all_mutex = Mutex.new
  @parts = []
  @todo_mutex = Mutex.new
  @todo_parts = []
end

Instance Method Details

#checkpointObject

Checkpoint structures:

Examples:

states = {
  :id => 'upload_id',
  :file => 'file',
  :file_meta => {
    :mtime => Time.now,
    :md5 => 1024
  },
  :parts => [
    {:number => 1, :range => [0, 100], :done => false},
    {:number => 2, :range => [100, 200], :done => true}
  ],
  :md5 => 'states_md5'
}


84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/aliyun/oss/upload.rb', line 84

def checkpoint
  logger.debug("Begin make checkpoint, disable_cpt: "\
               "#{options[:disable_cpt] == true}")

  ensure_file_not_changed

  parts = sync_get_all_parts
  states = {
    :id => id,
    :file => @file,
    :file_meta => @file_meta,
    :parts => parts
  }

  # report progress
  if @progress
    done = parts.count { |p| p[:done] }
    @progress.call(done.to_f / parts.size) if done > 0
  end

  write_checkpoint(states, @cpt_file) unless options[:disable_cpt]

  logger.debug("Done make checkpoint, states: #{states}")
end

#runObject

Run the upload transaction, which includes 3 stages:

  • 1a. initiate(new upload) and divide parts

  • 1b. rebuild states(resumed upload)

    1. upload each unfinished part

    1. commit the multipart upload transaction



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/aliyun/oss/upload.rb', line 38

def run
  logger.info("Begin upload, file: #{@file}, "\
              "checkpoint file: #{@cpt_file}, "\
              "threads: #{@num_threads}")

  # Rebuild transaction states from checkpoint file
  # Or initiate new transaction states
  rebuild

  # Divide the file to upload into parts to upload separately
  divide_parts if @parts.empty?

  # Upload each part
  @todo_parts = @parts.reject { |p| p[:done] }

  (1..@num_threads).map {
    Thread.new {
      loop {
        p = sync_get_todo_part
        break unless p
        upload_part(p)
      }
    }
  }.map(&:join)

  # Commit the multipart upload transaction
  commit

  logger.info("Done upload, file: #{@file}")
end