Module: Sluice::Storage::S3

Defined in:
lib/sluice/storage/s3/s3.rb,
lib/sluice/storage/s3/location.rb,
lib/sluice/storage/s3/manifest.rb,
lib/sluice/storage/s3/contracts.rb

Defined Under Namespace

Classes: Location, Manifest, ManifestScope

Constant Summary collapse

CONCURRENCY =

Constants

10
RETRIES =

Threads

3
RETRY_WAIT =

Attempts

10
TIMEOUT_WAIT =

Seconds

1800
FogStorage =

Aliases for Contracts

Fog::Storage::AWS::Real
FogFile =
Fog::Storage::AWS::File

Class Method Summary collapse

Class Method Details

.copy_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Copies files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to copy

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



206
207
208
209
210
# File 'lib/sluice/storage/s3/s3.rb', line 206

def copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  copying #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:copy, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten)
end

.copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Copies files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/sluice/storage/s3/s3.rb', line 183

def copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)            

  puts "  copying inter-account #{describe_from(from_location)} to #{to_location}"
  processed = []
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    processed = download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
  end

  processed
end

.copy_files_manifest(s3, manifest, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Copies files between S3 locations maintaining a manifest to avoid copying a file which was copied previously.

Useful in scenarios such as:

  1. You would like to do a move but only have read permission on the source bucket

  2. You would like to do a move but some other process needs to use the files after you

s3

A Fog::Storage s3 connection

manifest

A Sluice::Storage::S3::Manifest object

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from

to_location

S3Location to copy files to

match_regex

a regex string to match the files to copy

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



229
230
231
232
233
234
235
236
237
# File 'lib/sluice/storage/s3/s3.rb', line 229

def copy_files_manifest(s3, manifest, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  copying with manifest #{describe_from(from_files_or_loc)} to #{to_location}"
  ignore = manifest.get_entries(s3) # Files to leave untouched
  processed = process_files(:copy, s3, from_files_or_loc, ignore, match_regex, to_location, alter_filename_lambda, flatten)
  manifest.add_entries(s3, processed)

  processed
end

.delete_files(s3, from_files_or_loc, match_regex = '.+') ⇒ Object

Delete files from S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to delete files from

match_regex

a regex string to match the files to delete



157
158
159
160
161
# File 'lib/sluice/storage/s3/s3.rb', line 157

def delete_files(s3, from_files_or_loc, match_regex='.+')

  puts "  deleting #{describe_from(from_files_or_loc)}"
  process_files(:delete, s3, from_files_or_loc, [], match_regex)
end

.describe_from(from_files_or_dir_or_loc) ⇒ Object

Provides string describing from_files_or_dir_or_loc for logging purposes.

Parameters:

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

Returns a log-friendly string



353
354
355
356
357
358
359
# File 'lib/sluice/storage/s3/s3.rb', line 353

def describe_from(from_files_or_dir_or_loc)
  if from_files_or_dir_or_loc.is_a?(Array)
    "#{from_files_or_dir_or_loc.length} file(s)"
  else
    "files from #{from_files_or_dir_or_loc}"
  end
end

.download_file(s3, from_file, to_file) ⇒ Object

Download a single file to the exact path specified Has no intelligence around filenaming. Makes sure to create the path as needed.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A Fog::Storage::AWS::File to download

+to_file

A local file path



332
333
334
335
336
337
338
339
340
341
# File 'lib/sluice/storage/s3/s3.rb', line 332

def download_file(s3, from_file, to_file)

  FileUtils.mkdir_p(File.dirname(to_file))

  # TODO: deal with bug where Fog hangs indefinitely if network connection dies during download

  local_file = File.open(to_file, "w")
  local_file.write(from_file.body)
  local_file.close
end

.download_files(s3, from_files_or_loc, to_directory, match_regex = '.+') ⇒ Object

Download files from an S3 location to local storage, concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to download files from

to_directory

Local directory to copy files to

match_regex

a regex string to match the files to delete



144
145
146
147
148
# File 'lib/sluice/storage/s3/s3.rb', line 144

def download_files(s3, from_files_or_loc, to_directory, match_regex='.+')

  puts "  downloading #{describe_from(from_files_or_loc)} to #{to_directory}"
  process_files(:download, s3, from_files_or_loc, [], match_regex, to_directory)
end

.get_basename(path) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/sluice/storage/s3/s3.rb', line 111

def get_basename(path)
  if is_folder?(path)
    nil
  else
    match = path.match('([^/]+)$')
    if match
      match[1]
    else
      nil
    end
  end
end

.glob_files(dir, glob) ⇒ Object

A helper function to list all files recursively in a folder

Parameters:

dir

Directory to list files recursively

match_regex

a regex string to match the files to copy

Returns array of files (no sub-directories)



622
623
624
625
626
# File 'lib/sluice/storage/s3/s3.rb', line 622

def glob_files(dir, glob)
  Dir.glob(File.join(dir, glob)).select { |f|
    File.file?(f) # Drop sub-directories
  }
end

.is_empty?(s3, location) ⇒ Boolean

Returns:

  • (Boolean)


131
132
133
# File 'lib/sluice/storage/s3/s3.rb', line 131

def is_empty?(s3, location)
  list_files(s3, location).length == 0
end

.is_file?(path) ⇒ Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/sluice/storage/s3/s3.rb', line 98

def is_file?(path)
  !is_folder?(path)
end

.is_folder?(path) ⇒ Boolean

Returns:

  • (Boolean)


85
86
87
88
# File 'lib/sluice/storage/s3/s3.rb', line 85

def is_folder?(path)
  (path.end_with?('_$folder$') || # EMR-created
    path.end_with?('/'))
end

.list_files(s3, location) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/sluice/storage/s3/s3.rb', line 65

def list_files(s3, location)
  files_and_dirs = s3.directories.get(location.bucket, prefix: location.dir_as_path).files

  files = [] # Can't use a .select because of Ruby deep copy issues (array of non-POROs)
  files_and_dirs.each { |f|
    if is_file?(f.key)
      files << f.dup
    end
  }
  files
end

.move_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Moves files between S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



281
282
283
284
285
# File 'lib/sluice/storage/s3/s3.rb', line 281

def move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving #{describe_from(from_files_or_loc)} to #{to_location}"
  process_files(:move, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten)
end

.move_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object

Moves files between S3 locations in two different accounts

Implementation is as follows:

  1. Concurrent download of all files from S3 source to local tmpdir

  2. Concurrent upload of all files from local tmpdir to S3 target

  3. Concurrent deletion of all files from S3 source

In other words, the three operations are not interleaved (which is inefficient because upload speeds are much lower than download speeds)

from_s3

A Fog::Storage s3 connection for accessing the from S3Location

to_s3

A Fog::Storage s3 connection for accessing the to S3Location

from_location

S3Location to move files from

to_location

S3Location to move files to

match_regex

a regex string to match the files to move

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_location



257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/sluice/storage/s3/s3.rb', line 257

def move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false)

  puts "  moving inter-account #{describe_from(from_location)} to #{to_location}"
  processed = []
  Dir.mktmpdir do |t|
    tmp = Sluice::Storage.trail_slash(t)
    processed = download_files(from_s3, from_location, tmp, match_regex)
    upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded
    delete_files(from_s3, from_location, '.+') # Delete all files we downloaded
  end

  processed
end

.name_file(filepath, new_filename, remove_path = nil, add_path = nil, flatten = false) ⇒ Object

A helper function to prepare destination filenames and paths. This is a bit weird

  • it needs to exist because of differences

in the way that Amazon S3, Fog and Unix treat filepaths versus keys.

Parameters:

filepath

Path to file (including old filename)

new_filename

Replace the filename in the path with this

remove_path

If this is set, strip this from the front of the path

add_path

If this is set, add this to the front of the path

flatten

strips off any sub-folders below the from_location

TODO: this badly needs unit tests



669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
# File 'lib/sluice/storage/s3/s3.rb', line 669

def name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false)

  # First, replace the filename in filepath with new one
  dirname = File.dirname(filepath)
  new_filepath = (dirname == '.') ? new_filename : dirname + '/' + new_filename

  # Nothing more to do
  return new_filepath if remove_path.nil? and add_path.nil? and not flatten

  shortened_filepath =  if flatten
                          # Let's revert to just the filename
                          new_filename
                        else
                          # If we have a 'remove_path', it must be found at
                          # the start of the path.
                          # If it's not, you're probably using name_file()
                          # wrong.
                          if !filepath.start_with?(remove_path)
                            raise StorageOperationError, "name_file failed. Filepath '#{filepath}' does not start with '#{remove_path}'"
                          end

                          # Okay, let's remove the filepath
                          new_filepath[remove_path.length()..-1]
                        end

  # Nothing more to do
  return shortened_filepath if add_path.nil?

  # Add the new filepath on to the start and return
  return add_path + shortened_filepath
end

.new_fog_s3_from(region, access_key_id, secret_access_key) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/sluice/storage/s3/s3.rb', line 45

def new_fog_s3_from(region, access_key_id, secret_access_key)
  fog = Fog::Storage.new({
    :provider => 'AWS',
    :region => region,
    :aws_access_key_id => access_key_id,
    :aws_secret_access_key => secret_access_key
  })
  fog.sync_clock
  fog
end

.process_files(operation, s3, from_files_or_dir_or_loc, ignore = [], match_regex_or_glob = '.+', to_loc_or_dir = nil, alter_filename_lambda = false, flatten = false) ⇒ Object

Concurrent file operations between S3 locations. Supports:

  • Download

  • Upload

  • Copy

  • Delete

  • Move (= Copy + Delete)

Parameters:

operation

Operation to perform. :download, :upload, :copy, :delete, :move supported

ignore

Array of filenames to ignore (used by manifest code)

s3

A Fog::Storage s3 connection

from_files_or_dir_or_loc

Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from

match_regex_or_glob

a regex or glob string to match the files to process

to_loc_or_dir

S3Location or local directory to process files to

alter_filename_lambda

lambda to alter the written filename

flatten

strips off any sub-folders below the from_loc_or_dir



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
# File 'lib/sluice/storage/s3/s3.rb', line 378

def process_files(operation, s3, from_files_or_dir_or_loc, ignore=[], match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=false, flatten=false)

  # Validate that the file operation makes sense
  case operation
  when :copy, :move, :download, :upload
    if to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s requires the to_loc_or_dir to be set" % operation
    end
  when :delete
    unless to_loc_or_dir.nil?
      raise StorageOperationError "File operation %s does not support the to_loc_or_dir argument" % operation
    end
    if alter_filename_lambda.class == Proc
      raise StorageOperationError "File operation %s does not support the alter_filename_lambda argument" % operation
    end
  else
    raise StorageOperationError "File operation %s is unsupported. Try :download, :upload, :copy, :delete or :move" % operation
  end

  # If we have an array of files, no additional globbing required
  if from_files_or_dir_or_loc.is_a?(Array)
    files_to_process = from_files_or_dir_or_loc # Could be filepaths or Fog::Storage::AWS::File's
    globbed = true
  # Otherwise if it's an upload, we can glob now
  elsif operation == :upload
    files_to_process = glob_files(from_files_or_dir_or_loc, match_regex_or_glob)
    globbed = true
  # Otherwise we'll do threaded globbing later...
  else
    files_to_process = []
    from_loc = from_files_or_dir_or_loc # Alias
    globbed = false
  end

  threads = []
  mutex = Mutex.new
  complete = false
  marker_opts = {}
  processed_files = [] # For manifest updating, determining if any files were moved etc

  # If an exception is thrown in a thread that isn't handled, die quickly
  Thread.abort_on_exception = true

  # Create Ruby threads to concurrently execute s3 operations
  for i in (0...CONCURRENCY)

    # Each thread pops a file off the files_to_process array, and moves it.
    # We loop until there are no more files
    threads << Thread.new(i) do |thread_idx|

      loop do
        file = false
        filepath = false
        from_bucket = false
        from_path = false
        match = false

        # First critical section:
        # only allow one thread to modify the array at any time
        mutex.synchronize do

          # No need to do further globbing 
          if globbed
            if files_to_process.size == 0
              complete = true
              next
            end

            file = files_to_process.pop
            # Support raw filenames and also Fog::Storage::AWS::File's
            if (file.is_a?(Fog::Storage::AWS::File))
              from_bucket = file.directory.key # Bucket
              from_path = Sluice::Storage.trail_slash(File.dirname(file.key))
              filepath = file.key
            else
              from_bucket = nil # Not used
              if from_files_or_dir_or_loc.is_a?(Array)
                from_path = Sluice::Storage.trail_slash(File.dirname(file))
              else
                from_path = from_files_or_dir_or_loc # The root dir
              end
              filepath = file
            end

            match = true # Match is implicit in the glob
          else

            while !complete && !match do
              if files_to_process.size == 0
                # S3 batches 1000 files per request.
                # We load up our array with the files to move
                files_to_process = s3.directories.get(from_loc.bucket, :prefix => from_loc.dir).files.all(marker_opts).to_a
                # If we don't have any files after the S3 request, we're complete
                if files_to_process.size == 0
                  complete = true
                  next
                else
                  marker_opts['marker'] = files_to_process.last.key

                  # By reversing the array we can use pop and get FIFO behaviour
                  # instead of the performance penalty incurred by unshift
                  files_to_process = files_to_process.reverse
                end
              end

              file = files_to_process.pop
              from_bucket = from_loc.bucket
              from_path = from_loc.dir_as_path
              filepath = file.key

              # TODO: clean up following https://github.com/snowplow/sluice/issues/25
              match = if match_regex_or_glob.is_a? NegativeRegex
                        !filepath.match(match_regex_or_glob.regex)
                      else
                        filepath.match(match_regex_or_glob)
                      end

            end
          end
        end
        # End of mutex.synchronize

        # Kill this thread's loop (and thus this thread) if we are complete
        break if complete

        # Skip processing for a folder or file which doesn't match our regexp or glob
        next if is_folder?(filepath) or not match

        # Name file
        basename = get_basename(filepath)
        next if ignore.include?(basename) # Don't process if in our leave list

        filename = rename_file(filepath, basename, alter_filename_lambda)

        # What are we doing? Let's determine source and target
        # Note that target excludes bucket name where relevant
        case operation
        when :upload
          source = "#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    UPLOAD #{source} +-> #{to_loc_or_dir.bucket}/#{target}"                
        when :download
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir, flatten)
          puts "(t#{thread_idx})    DOWNLOAD #{source} +-> #{target}"
        when :move
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    MOVE #{source} -> #{to_loc_or_dir.bucket}/#{target}"
        when :copy
          source = "#{from_bucket}/#{filepath}"
          target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten)
          puts "(t#{thread_idx})    COPY #{source} +-> #{to_loc_or_dir.bucket}/#{target}"
        when :delete
          source = "#{from_bucket}/#{filepath}"
          # No target
          puts "(t#{thread_idx})    DELETE x #{source}" 
        end

        # Upload is a standalone operation vs move/copy/delete
        if operation == :upload
          retry_x(
            Sluice::Storage::S3,
            [:upload_file, s3, filepath, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +/> #{target}",
            "Problem uploading #{filepath}. Retrying.")
        end                

        # Download is a standalone operation vs move/copy/delete
        if operation == :download
          retry_x(
            Sluice::Storage::S3,
            [:download_file, s3, file, target],
            RETRIES,
            "      +/> #{target}",
            "Problem downloading #{filepath}. Retrying.")
        end

        # A move or copy starts with a copy file
        if [:move, :copy].include? operation
          retry_x(
            file,
            [:copy, to_loc_or_dir.bucket, target],
            RETRIES,
            "      +-> #{to_loc_or_dir.bucket}/#{target}",
            "Problem copying #{filepath}. Retrying.")
        end

        # A move or delete ends with a delete
        if [:move, :delete].include? operation
          retry_x(
            file,
            [:destroy],
            RETRIES,
            "      x #{source}",
            "Problem destroying #{filepath}. Retrying.")
        end

        # Second critical section: we need to update
        # processed_files in a thread-safe way
        mutex.synchronize do
          processed_files << filepath
        end
      end
    end
  end

  # Wait for threads to finish
  threads.each { |aThread|  aThread.join }

  processed_files # Return the processed files
end

.retry_x(object, send_args, retries, attempt_msg, failure_msg) ⇒ Object

A helper function to attempt to run a function retries times

Parameters:

object

Object to send our function to

send_args

Function plus arguments

retries

Number of retries to attempt

attempt_msg

Message to puts on each attempt

failure_msg

Message to puts on each failure



638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
# File 'lib/sluice/storage/s3/s3.rb', line 638

def retry_x(object, send_args, retries, attempt_msg, failure_msg)
  i = 0
  begin
    Timeout::timeout(TIMEOUT_WAIT) do # In case our operation times out
      object.send(*send_args)
      puts attempt_msg
    end
  rescue
    raise unless i < retries
    puts failure_msg
    sleep(RETRY_WAIT)  # Give us a bit of time before retrying
    i += 1
    retry
  end
end

.upload_file(s3, from_file, to_bucket, to_file) ⇒ Object

Upload a single file to the exact location specified Has no intelligence around filenaming.

Parameters:

s3

A Fog::Storage s3 connection

+from_file

A local file path

+to_bucket

The Fog::Directory to upload to

+to_file

The file path to upload to



310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/sluice/storage/s3/s3.rb', line 310

def upload_file(s3, from_file, to_bucket, to_file)

  local_file = File.open(from_file)

  dir = s3.directories.new(:key => to_bucket) # No request made
  file = dir.files.create(
    :key    => to_file,
    :body   => local_file
  )

  local_file.close
end

.upload_files(s3, from_files_or_dir, to_location, match_glob = '*') ⇒ Object

Uploads files to S3 locations concurrently

Parameters:

s3

A Fog::Storage s3 connection

from_files_or_dir

Local array of files or local directory to upload files from

to_location

S3Location to upload files to

match_glob

a filesystem glob to match the files to upload



295
296
297
298
299
# File 'lib/sluice/storage/s3/s3.rb', line 295

def upload_files(s3, from_files_or_dir, to_location, match_glob='*')

  puts "  uploading #{describe_from(from_files_or_dir)} to #{to_location}"
  process_files(:upload, s3, from_files_or_dir, [], match_glob, to_location)
end