Module: Sluice::Storage::S3

Defined in:
lib/sluice/storage/s3.rb

Defined Under Namespace

Classes: Location

Constant Summary collapse

CONCURRENCY =

Constants

10
RETRIES =

Threads

3
RETRY_WAIT =

Attempts

10

Class Method Summary collapse

Class Method Details

.copy_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Copies files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to copy

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



224
225
226
227
228
# File 'lib/sluice/storage/s3.rb', line 224

def copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  copying #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:copy, s3, from_files_or_loc, match_regex, to_location, alter_filename_lambda, flatten)
end

.copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Copies files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



203
204
205
206
207
208
209
210
211
212
# File 'lib/sluice/storage/s3.rb', line 203

def copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)            

  puts "  copying inter-account #{describe_from(from_location)} to #{to_location}"
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
  end

end

.delete_files(s3, from_files_or_loc, match_regex = '.+') ⇒ Object

Delete files from S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to delete files from

match_regex

a regex string to match the files to delete



177
178
179
180
181
# File 'lib/sluice/storage/s3.rb', line 177

def delete_files(s3, from_files_or_loc, match_regex='.+')

  puts "  deleting #{describe_from(from_files_or_loc)}"
  process_files(:delete, s3, from_files_or_loc, match_regex)
end

.describe_from(from_files_or_dir_or_loc) ⇒ Object

Provides string describing from_files_or_dir_or_loc for logging purposes.

Parameters:

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

Returns a log-friendly string



342
343
344
345
346
347
348
# File 'lib/sluice/storage/s3.rb', line 342

def describe_from(from_files_or_dir_or_loc)
  if from_files_or_dir_or_loc.is_a?(Array)
    "#{from_files_or_dir_or_loc.length} file(s)"
  else
    "files from #{from_files_or_dir_or_loc}"
  end
end

.download_file(s3, from_file, to_file) ⇒ Object

Download a single file to the exact path specified Has no intelligence around filenaming. Makes sure to create the path as needed.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A Fog::Storage::AWS::File to download

+to_file

A local file path



321
322
323
324
325
326
327
328
329
330
# File 'lib/sluice/storage/s3.rb', line 321

def download_file(s3, from_file, to_file)

  FileUtils.mkdir_p(File.dirname(to_file))

  # TODO: deal with bug where Fog hangs indefinitely if network connection dies during download

  local_file = File.open(to_file, "w")
  local_file.write(from_file.body)
  local_file.close
end

.download_files(s3, from_files_or_loc, to_directory, match_regex = '.+') ⇒ Object

Download files from an S3 location to local storage, concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to download files from

to_directory

Local directory to copy files to

match_regex

a regex string to match the files to delete



164
165
166
167
168
# File 'lib/sluice/storage/s3.rb', line 164

def download_files(s3, from_files_or_loc, to_directory, match_regex='.+')

  puts "  downloading #{describe_from(from_files_or_loc)} to #{to_directory}"
  process_files(:download, s3, from_files_or_loc, match_regex, to_directory)
end

.get_basename(path) ⇒ Object

Returns the basename for the given path

Parameters:

path

S3 path in String form

Returns the basename, or nil if the path is to a folder



132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/sluice/storage/s3.rb', line 132

def get_basename(path)
  if is_folder?(path)
    nil
  else
    match = path.match('([^/]+)$')
    if match
      match[1]
    else
      nil
    end
  end
end

.glob_files(dir, glob) ⇒ Object

A helper function to list all files recursively in a folder

Parameters:

dir

Directory to list files recursively

match_regex

a regex string to match the files to copy

Returns array of files (no sub-directories)



576
577
578
579
580
# File 'lib/sluice/storage/s3.rb', line 576

def glob_files(dir, glob)
  Dir.glob(File.join(dir, glob)).select { |f|
    File.file?(f) # Drop sub-directories
  }
end

.is_empty?(s3, location) ⇒ Boolean

Determine if a bucket is empty

Parameters:

s3

A Fog::Storage s3 connection

location

The location to check

Returns:

  • (Boolean)


151
152
153
# File 'lib/sluice/storage/s3.rb', line 151

def is_empty?(s3, location)
  list_files(s3, location).length <= 1
end

.is_file?(path) ⇒ Boolean

Whether the given path is a file or not

Parameters:

path

S3 path in String form

Returns boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/sluice/storage/s3.rb', line 120

def is_file?(path)
  !is_folder?(path)
end

.is_folder?(path) ⇒ Boolean

Whether the given path is a directory or not

Parameters:

path

S3 path in String form

Returns boolean

Returns:

  • (Boolean)


108
109
110
111
# File 'lib/sluice/storage/s3.rb', line 108

def is_folder?(path)
  (path.end_with?('_$folder$') || # EMR-created
    path.end_with?('/'))
end

.list_files(s3, location) ⇒ Object

Return an array of all Fog::Storage::AWS::File’s

Parameters:

s3

A Fog::Storage s3 connection

location

The location to return files from

Returns array of Fog::Storage::AWS::File’s



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/sluice/storage/s3.rb', line 89

def list_files(s3, location)
  files_and_dirs = s3.directories.get(location.bucket, prefix: location.dir).files

  files = [] # Can't use a .select because of Ruby deep copy issues (array of non-POROs)
  files_and_dirs.each { |f|
    if is_file?(f.key)
      files << f.dup
    end
  }
  files
end

.move_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Moves files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



270
271
272
273
274
# File 'lib/sluice/storage/s3.rb', line 270

def move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:move, s3, from_files_or_loc, match_regex, to_location, alter_filename_lambda, flatten)
end

.move_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Moves files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

  3. Concurrent deletion of all files from S3 source

In other words, the three operations are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



248
249
250
251
252
253
254
255
256
257
258
# File 'lib/sluice/storage/s3.rb', line 248

def move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving inter-account #{describe_from(from_location)} to #{to_location}"
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
    delete_files(from_s3, from_location, '.+') # Delete all files we downloaded
  end

end

.name_file(filepath, new_filename, remove_path = nil, add_path = nil, flatten = false) ⇒ Object

A helper function to prepare destination filenames and paths. This is a bit weird

  • it needs to exist because of differences

in the way that Amazon S3, Fog and Unix treat filepaths versus keys.

Parameters:

filepath

Path to file (including old filename)

new_filename

Replace the filename in the path with this

remove_path

If this is set, strip this from the front of the path

add_path

If this is set, add this to the front of the path

flatten

strips off any sub-folders below the from_location

TODO: this really needs unit tests



621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
# File 'lib/sluice/storage/s3.rb', line 621

def name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false)

  # First, replace the filename in filepath with new one
  dirname = File.dirname(filepath)
  new_filepath = (dirname == '.') ? new_filename : dirname + '/' + new_filename

  # Nothing more to do
  return new_filepath if remove_path.nil? and add_path.nil? and not flatten

  shortened_filepath =  if flatten
                          # Let's revert to just the filename
                          new_filename
                        else
                          # If we have a 'remove_path', it must be found at
                          # the start of the path.
                          # If it's not, you're probably using name_file()
                          # wrong.
                          if !filepath.start_with?(remove_path)
                            raise StorageOperationError, "name_file failed. Filepath '#{filepath}' does not start with '#{remove_path}'"
                          end

                          # Okay, let's remove the filepath
                          new_filepath[remove_path.length()..-1]
                        end

  # Nothing more to do
  return shortened_filepath if add_path.nil?

  # Add the new filepath on to the start and return
  return add_path + shortened_filepath
end

.new_fog_s3_from(region, access_key_id, secret_access_key) ⇒ Object

Helper function to instantiate a new Fog::Storage for S3 based on our config options

Parameters:

region

Amazon S3 region we will be working with

access_key_id

AWS access key ID

secret_access_key

AWS secret access key



70
71
72
73
74
75
76
77
78
79
# File 'lib/sluice/storage/s3.rb', line 70

def new_fog_s3_from(region, access_key_id, secret_access_key)
  fog = Fog::Storage.new({
    :provider => 'AWS',
    :region => region,
    :aws_access_key_id => access_key_id,
    :aws_secret_access_key => secret_access_key
  })
  fog.sync_clock
  fog
end

.process_files(operation, s3, from_files_or_dir_or_loc, match_regex_or_glob = '.+', to_loc_or_dir = nil, alter_filename_lambda = false, flatten = false) ⇒ Object

Concurrent file operations between S3 locations. Supports:

  • Download

  • Upload

  • Copy

  • Delete

  • Move (= Copy + Delete)

Parameters:

operation

Operation to perform. :download, :upload, :copy, :delete, :move supported

s3

A Fog::Storage s3 connection

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

match_regex_or_glob

a regex or glob string to match the files to process

to_loc_or_dir

S3Location or local directory to process files to

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_loc_or_dir



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
# File 'lib/sluice/storage/s3.rb', line 366

def process_files(operation, s3, from_files_or_dir_or_loc, match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=false, flatten=false)

  # Validate that the file operation makes sense
  case operation
  when :copy, :move, :download, :upload
    if to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s requires the to_loc_or_dir to be set" % operation
    end
  when :delete
    unless to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s does not support the to_loc_or_dir argument" % operation
    end
    if alter_filename_lambda.class == Proc
      raise StorageOperationError "File operation %s does not support the alter_filename_lambda argument" % operation
    end
  else
    raise StorageOperationError "File operation %s is unsupported. Try :download, :upload, :copy, :delete or :move" % operation
  end

  # If we have an array of files, no additional globbing required
  if from_files_or_dir_or_loc.is_a?(Array)
    files_to_process = from_files_or_dir_or_loc # Could be filepaths or Fog::Storage::AWS::File's
    globbed = true
  # Otherwise if it's an upload, we can glob now
  elsif operation == :upload
    files_to_process = glob_files(from_files_or_dir_or_loc, match_regex_or_glob)
    globbed = true
  # Otherwise we'll do threaded globbing later...
  else
    files_to_process = []
    from_loc = from_files_or_dir_or_loc # Alias
    globbed = false
  end

  threads = []
  mutex = Mutex.new
  complete = false
  marker_opts = {}

  # If an exception is thrown in a thread that isn't handled, die quickly
  Thread.abort_on_exception = true

  # Create Ruby threads to concurrently execute s3 operations
  for i in (0...CONCURRENCY)

    # Each thread pops a file off the files_to_process array, and moves it.
    # We loop until there are no more files
    threads << Thread.new do
      loop do
        file = false
        filepath = false
        from_bucket = false
        from_path = false
        match = false

        # Critical section:
        # only allow one thread to modify the array at any time
        mutex.synchronize do

          # No need to do further globbing 
          if globbed
            if files_to_process.size == 0
              complete = true
              next
            end

            file = files_to_process.pop
            # Support raw filenames and also Fog::Storage::AWS::File's
            if (file.is_a?(Fog::Storage::AWS::File))
              from_bucket = file.directory.key # Bucket
              from_path = Sluice::Storage.trail_slash(File.dirname(file.key))
              filepath = file.key
            else
              from_bucket = nil # Not used
              if from_files_or_dir_or_loc.is_a?(Array)
                from_path = Sluice::Storage.trail_slash(File.dirname(file))
              else
                from_path = from_files_or_dir_or_loc # The root dir
              end
              filepath = file
            end

            match = true # Match is implicit in the glob
          else

            while !complete && !match do
              if files_to_process.size == 0
                # S3 batches 1000 files per request.
                # We load up our array with the files to move
                files_to_process = s3.directories.get(from_loc.bucket, :prefix => from_loc.dir).files.all(marker_opts)
                # If we don't have any files after the S3 request, we're complete
                if files_to_process.size == 0
                  complete = true
                  next
                else
                  marker_opts['marker'] = files_to_process.last.key

                  # By reversing the array we can use pop and get FIFO behaviour
                  # instead of the performance penalty incurred by unshift
                  files_to_process = files_to_process.reverse
                end
              end

              file = files_to_process.pop
              from_bucket = from_loc.bucket
              from_path = from_loc.dir_as_path
              filepath = file.key

              match = if match_regex_or_glob.is_a? NegativeRegex
                        !filepath.match(match_regex_or_glob.regex)
                      else
                        filepath.match(match_regex_or_glob)
                      end
            end
          end
        end

        break unless match
        break if is_folder?(filepath)

        # Name file
        basename = get_basename(filepath)
        if alter_filename_lambda.class == Proc
          filename = alter_filename_lambda.call(basename)
        else
          filename = basename
        end

        # What are we doing? Let's determine source and target
        # Note that target excludes bucket name where relevant
        case operation
        when :upload
          source = "#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "    UPLOAD #{source} +-> #{to_loc_or_dir.bucket}/#{target}"                
        when :download
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir, flatten)
          puts "    DOWNLOAD #{source} +-> #{target}"
        when :move
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "    MOVE #{source} -> #{to_loc_or_dir.bucket}/#{target}"
        when :copy
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "    COPY #{source} +-> #{to_loc_or_dir.bucket}/#{target}"
        when :delete
          source = "#{from_bucket}/#{filepath}"
          # No target
          puts "    DELETE x #{source}" 
        end

        # Upload is a standalone operation vs move/copy/delete
        if operation == :upload
          retry_x(
            Sluice::Storage::S3,
            [:upload_file, s3, filepath, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +/> #{target}",
            "Problem uploading #{filepath}. Retrying.")
        end                

        # Download is a standalone operation vs move/copy/delete
        if operation == :download
          retry_x(
            Sluice::Storage::S3,
            [:download_file, s3, file, target],
            RETRIES,
            "      +/> #{target}",
            "Problem downloading #{filepath}. Retrying.")
        end

        # A move or copy starts with a copy file
        if [:move, :copy].include? operation
          retry_x(
            file,
            [:copy, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +-> #{to_loc_or_dir.bucket}/#{target}",
            "Problem copying #{filepath}. Retrying.")
        end

        # A move or delete ends with a delete
        if [:move, :delete].include? operation
          retry_x(
            file,
            [:destroy],
            RETRIES,
            "      x #{source}",
            "Problem destroying #{filepath}. Retrying.")
        end
      end
    end
  end

  # Wait for threads to finish
  threads.each { |aThread|  aThread.join }

end

.retry_x(object, send_args, retries, attempt_msg, failure_msg) ⇒ Object

A helper function to attempt to run a function retries times

Parameters:

function

Function to run

retries

Number of retries to attempt

attempt_msg

Message to puts on each attempt

failure_msg

Message to puts on each failure



592
593
594
595
596
597
598
599
600
601
602
603
604
# File 'lib/sluice/storage/s3.rb', line 592

def retry_x(object, send_args, retries, attempt_msg, failure_msg)
  i = 0
  begin
    object.send(*send_args)
    puts attempt_msg
  rescue
    raise unless i < retries
    puts failure_msg
    sleep(RETRY_WAIT)  # Give us a bit of time before retrying
    i += 1
    retry
  end        
end

.upload_file(s3, from_file, to_bucket, to_file) ⇒ Object

Upload a single file to the exact location specified Has no intelligence around filenaming.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A local file path

+to_bucket

The Fog::Directory to upload to

+to_file

The file path to upload to



299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/sluice/storage/s3.rb', line 299

def upload_file(s3, from_file, to_bucket, to_file)

  local_file = File.open(from_file)

  dir = s3.directories.new(:key => to_bucket) # No request made
  file = dir.files.create(
    :key    => to_file,
    :body   => local_file
  )

  local_file.close
end

.upload_files(s3, from_files_or_dir, to_location, match_glob = '*') ⇒ Object

Uploads files to S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_dir

Local array of files or local directory to upload files from

to_location

S3Location to upload files to

match_glob

a filesystem glob to match the files to upload



284
285
286
287
288
# File 'lib/sluice/storage/s3.rb', line 284

def upload_files(s3, from_files_or_dir, to_location, match_glob='*')

  puts "  uploading #{describe_from(from_files_or_dir)} to #{to_location}"
  process_files(:upload, s3, from_files_or_dir, match_glob, to_location)
end