Class: LogStash::Outputs::Qingstor::Uploader::MultipartUploader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/qingstor/multipart_uploader.rb

Constant Summary collapse

MINIMAL_PART_SIZE =

According to QingStor API doc, the minimal part size is 4MB. the maximal part size is 1GB. Here the minimal size would never be used and the maximal part size is 50MB. Overlarge file size would consume java heap space and throw Exceptions.

4 * 1024 * 1024
MAXIMAL_PART_SIZE =
50 * 1024 * 1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket, logger, object, upload_headers) ⇒ MultipartUploader

Returns a new instance of MultipartUploader.



20
21
22
23
24
25
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 20

def initialize(bucket, logger, object, upload_headers)
  @bucket = bucket
  @logger = logger
  @object = object
  @upload_headers = upload_headers
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



10
11
12
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 10

def bucket
  @bucket
end

#loggerObject (readonly)

Returns the value of attribute logger.



10
11
12
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 10

def logger
  @logger
end

#upload_headersObject

Returns the value of attribute upload_headers.



11
12
13
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 11

def upload_headers
  @upload_headers
end

Instance Method Details

#calculate_segmentObject



70
71
72
73
74
75
76
77
78
79
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 70

def calculate_segment
  segment_size = @object.size
  
  while segment_size >= MAXIMAL_PART_SIZE
    segment_size /= 2.0
    segment_size = segment_size.ceil
  end
        
  segment_size
end

#complete_multipart_upload(last_part_number) ⇒ Object



63
64
65
66
67
68
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 63

def complete_multipart_upload(last_part_number)
  object_parts = (0..last_part_number).to_a.map {|x| {'part_number' => x}}
  @upload_headers['object_parts'] = object_parts
  res = @bucket.complete_multipart_upload(@object.key, @upload_headers)
  @logger.info('multipart uploading completed', :file => @object.key)
end

#initiate_multipart_uploadObject



33
34
35
36
37
38
39
40
41
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 33

def initiate_multipart_upload
  res = @bucket.initiate_multipart_upload(@object.key, @upload_headers)
  @upload_headers['upload_id'] = res.fetch(:upload_id) do 
    raise(QingStor::Error, :error_msg => res)
  end
  @logger.debug('initiated upload id', :file => @object.key, 
                                       :upload_id => res[:upload_id])
  res[:upload_id]
end

#resume_multipart_upload(upload_id) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 81

def resume_multipart_upload(upload_id)
  @logger.debug('resume multipart uploading', :file => @object.key,
                                              :upload_id => upload_id)
  @upload_headers['upload_id'] = upload_id
  res = @bucket.list_multipart(@object.key, upload_headers)
  segment_size = res[:object_parts][0][:size]
  part_number = res[:object_parts].count
  upload_multipart(part_number, segment_size)
end

#uploadObject



27
28
29
30
31
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 27

def upload
  initiate_multipart_upload
  segment_size = calculate_segment
  upload_multipart(segment_size)
end

#upload_multipart(part_number = 0, segment_size) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/logstash/outputs/qingstor/multipart_uploader.rb', line 43

def upload_multipart(part_number = 0, segment_size)
  ::File.open(@object.path) do |f|
    f.seek(part_number * segment_size) if part_number != 0
    while data = f.read(segment_size)
      content_md5 = Digest::MD5.hexdigest(data)
      @upload_headers['body'] = data
      @upload_headers['content_length'] = segment_size
      @upload_headers['content_md5'] = content_md5
      @upload_headers['part_number'] = part_number
      @logger.debug('multipart uploading: ',
                    :file => @object.key,
                    :part_number => part_number)
      @bucket.upload_multipart(@object.key, @upload_headers)
      part_number += 1
    end
  end
  
  complete_multipart_upload(part_number - 1)
end