Class: S3_Multi_Upload::Upload

Inherits:
Object
  • Object
show all
Defined in:
lib/s3_multi_upload.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Upload

Returns a new instance of Upload.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/s3_multi_upload.rb', line 10

def initialize options

  AWS.config :access_key_id     => options[:access_key_id],
             :secret_access_key => options[:secret_access_key]

  @options = options
  @file    = Pathname.new options[:file]
  @queue   = Queue.new
  @mutex   = Mutex.new

  @s3      = AWS::S3.new
  @bucket  = case options[:create_bucket]
  when true
    @s3.buckets.create options[:bucket]
  else
    @s3.buckets[options[:bucket]]
  end

  @object  = @bucket.objects[options[:key] || @file.basename]

  enqueue
end

Instance Attribute Details

#bucketObject

Returns the value of attribute bucket.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def bucket
  @bucket
end

#fileObject

Returns the value of attribute file.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def file
  @file
end

#mutexObject

Returns the value of attribute mutex.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def mutex
  @mutex
end

#objectObject

Returns the value of attribute object.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def object
  @object
end

#optionsObject

Returns the value of attribute options.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def options
  @options
end

#progressObject

Returns the value of attribute progress.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def progress
  @progress
end

#queueObject

Returns the value of attribute queue.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def queue
  @queue
end

#s3Object

Returns the value of attribute s3.



8
9
10
# File 'lib/s3_multi_upload.rb', line 8

def s3
  @s3
end

Instance Method Details

#chunk_sizeObject



46
47
48
# File 'lib/s3_multi_upload.rb', line 46

def chunk_size
  normalize *options[:chunk_size].first
end

#enqueueObject



50
51
52
53
54
# File 'lib/s3_multi_upload.rb', line 50

def enqueue
  (file.size.to_f / chunk_size).ceil.times do |index|
    queue << [chunk_size * index, index + 1]
  end
end

#normalize(value, unit = nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/s3_multi_upload.rb', line 33

def normalize value, unit = nil
  case unit.downcase.to_sym
  when nil, :b, :byte, :bytes
    value.to_f
  when :k, :kb, :kilobyte, :kilobytes
    value.to_f * 1024
  when :m, :mb, :megabyte, :megabytes
    value.to_f * 1024 ** 2
  when :g, :gb, :gigabyte, :gigabytes
    value.to_f * 1024 ** 3
  end
end

#processObject



94
95
96
97
98
99
100
# File 'lib/s3_multi_upload.rb', line 94

def process
  value, unit = *options[:chunk_size].first
  puts "uploading #{file} to s3://#{options[:bucket]}/#{object.key} using #{options[:threads]} threads in chunks of #{value} #{unit}"
  progress if options[:progress_bar]
  abort 'upload failed' unless upload
  progress.finish if options[:progress_bar]
end

#uploadObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/s3_multi_upload.rb', line 56

def upload
  object.multipart_upload do |upload|
    options[:threads].times.collect do
      Thread.new do
        until queue.empty?
          offset, index = queue.deq :asynchronously rescue nil

          unless offset.nil?
            upload_parameters = {
              :data        => file.read(chunk_size, offset),
              :part_number => index,
            }

            if options[:checksum]
              digest         = Digest::MD5.digest(upload_parameters[:data])
              encoded_digest = Base64.encode64(digest).strip

              upload_parameters[:content_md5] = encoded_digest
            end

            upload.add_part upload_parameters

            if options[:progress_bar]
              mutex.synchronize do
                progress.inc
              end
            end
          end
        end
      end
    end.each(&:join)
  end
end