Module: Sluice::Storage::S3
- Defined in:
- lib/sluice/storage/s3.rb
Defined Under Namespace
Classes: Location
Constant Summary collapse
- CONCURRENCY =
Constants
10
- RETRIES =
Threads
3
- RETRY_WAIT =
Attempts
10
Class Method Summary collapse
-
.copy_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Copies files between S3 locations concurrently.
-
.copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Copies files between S3 locations in two different accounts.
-
.delete_files(s3, from_files_or_loc, match_regex = '.+') ⇒ Object
Delete files from S3 locations concurrently.
-
.describe_from(from_files_or_dir_or_loc) ⇒ Object
Provides string describing from_files_or_dir_or_loc for logging purposes.
-
.download_file(s3, from_file, to_file) ⇒ Object
Download a single file to the exact path specified Has no intelligence around filenaming.
-
.download_files(s3, from_files_or_loc, to_directory, match_regex = '.+') ⇒ Object
Download files from an S3 location to local storage, concurrently.
-
.get_basename(path) ⇒ Object
Returns the basename for the given path.
-
.glob_files(dir, glob) ⇒ Object
A helper function to list all files recursively in a folder.
-
.is_empty?(s3, location) ⇒ Boolean
Determine if a bucket is empty.
-
.is_file?(path) ⇒ Boolean
Whether the given path is a file or not.
-
.is_folder?(path) ⇒ Boolean
Whether the given path is a directory or not.
-
.list_files(s3, location) ⇒ Object
Return an array of all Fog::Storage::AWS::File’s.
-
.move_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Moves files between S3 locations concurrently.
-
.move_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Moves files between S3 locations in two different accounts.
-
.name_file(filepath, new_filename, remove_path = nil, add_path = nil, flatten = false) ⇒ Object
A helper function to prepare destination filenames and paths.
-
.new_fog_s3_from(region, access_key_id, secret_access_key) ⇒ Object
Helper function to instantiate a new Fog::Storage for S3 based on our config options.
-
.process_files(operation, s3, from_files_or_dir_or_loc, match_regex_or_glob = '.+', to_loc_or_dir = nil, alter_filename_lambda = false, flatten = false) ⇒ Object
Concurrent file operations between S3 locations.
-
.retry_x(object, send_args, retries, attempt_msg, failure_msg) ⇒ Object
A helper function to attempt to run a function retries times.
-
.upload_file(s3, from_file, to_bucket, to_file) ⇒ Object
Upload a single file to the exact location specified Has no intelligence around filenaming.
-
.upload_files(s3, from_files_or_dir, to_location, match_glob = '*') ⇒ Object
Uploads files to S3 locations concurrently.
Class Method Details
.copy_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Copies files between S3 locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from
to_location
-
S3Location to copy files to
match_regex
-
a regex string to match the files to copy
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
224 225 226 227 228 |
# File 'lib/sluice/storage/s3.rb', line 224 def copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) puts " copying #{describe_from(from_files_or_loc)} to #{to_location}" process_files(:copy, s3, from_files_or_loc, match_regex, to_location, alter_filename_lambda, flatten) end |
.copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Copies files between S3 locations in two different accounts
Implementation is as follows:
-
Concurrent download of all files from S3 source to local tmpdir
-
Concurrent upload of all files from local tmpdir to S3 target
In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)
In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)
from_s3
-
A Fog::Storage s3 connection for accessing the from S3Location
to_s3
-
A Fog::Storage s3 connection for accessing the to S3Location
from_location
-
S3Location to copy files from
to_location
-
S3Location to copy files to
match_regex
-
a regex string to match the files to move
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
203 204 205 206 207 208 209 210 211 212 |
# File 'lib/sluice/storage/s3.rb', line 203 def copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) puts " copying inter-account #{describe_from(from_location)} to #{to_location}" Dir.mktmpdir do |t| tmp = Sluice::Storage.trail_slash(t) download_files(from_s3, from_location, tmp, match_regex) upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded end end |
.delete_files(s3, from_files_or_loc, match_regex = '.+') ⇒ Object
Delete files from S3 locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to delete files from
match_regex
-
a regex string to match the files to delete
177 178 179 180 181 |
# File 'lib/sluice/storage/s3.rb', line 177 def delete_files(s3, from_files_or_loc, match_regex='.+') puts " deleting #{describe_from(from_files_or_loc)}" process_files(:delete, s3, from_files_or_loc, match_regex) end |
.describe_from(from_files_or_dir_or_loc) ⇒ Object
Provides string describing from_files_or_dir_or_loc for logging purposes.
Parameters:
from_files_or_dir_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from
Returns a log-friendly string
342 343 344 345 346 347 348 |
# File 'lib/sluice/storage/s3.rb', line 342 def describe_from(from_files_or_dir_or_loc) if from_files_or_dir_or_loc.is_a?(Array) "#{from_files_or_dir_or_loc.length} file(s)" else "files from #{from_files_or_dir_or_loc}" end end |
.download_file(s3, from_file, to_file) ⇒ Object
Download a single file to the exact path specified Has no intelligence around filenaming. Makes sure to create the path as needed.
Parameters:
s3
-
A Fog::Storage s3 connection
- +from_file
-
A Fog::Storage::AWS::File to download
- +to_file
-
A local file path
321 322 323 324 325 326 327 328 329 330 |
# File 'lib/sluice/storage/s3.rb', line 321 def download_file(s3, from_file, to_file) FileUtils.mkdir_p(File.dirname(to_file)) # TODO: deal with bug where Fog hangs indefinitely if network connection dies during download local_file = File.open(to_file, "w") local_file.write(from_file.body) local_file.close end |
.download_files(s3, from_files_or_loc, to_directory, match_regex = '.+') ⇒ Object
Download files from an S3 location to local storage, concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to download files from
to_directory
-
Local directory to copy files to
match_regex
-
a regex string to match the files to delete
164 165 166 167 168 |
# File 'lib/sluice/storage/s3.rb', line 164 def download_files(s3, from_files_or_loc, to_directory, match_regex='.+') puts " downloading #{describe_from(from_files_or_loc)} to #{to_directory}" process_files(:download, s3, from_files_or_loc, match_regex, to_directory) end |
.get_basename(path) ⇒ Object
Returns the basename for the given path
Parameters:
path
-
S3 path in String form
Returns the basename, or nil if the path is to a folder
132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/sluice/storage/s3.rb', line 132 def get_basename(path) if is_folder?(path) nil else match = path.match('([^/]+)$') if match match[1] else nil end end end |
.glob_files(dir, glob) ⇒ Object
A helper function to list all files recursively in a folder
Parameters:
dir
-
Directory to list files recursively
match_regex
-
a regex string to match the files to copy
Returns array of files (no sub-directories)
576 577 578 579 580 |
# File 'lib/sluice/storage/s3.rb', line 576 def glob_files(dir, glob) Dir.glob(File.join(dir, glob)).select { |f| File.file?(f) # Drop sub-directories } end |
.is_empty?(s3, location) ⇒ Boolean
Determine if a bucket is empty
Parameters:
s3
-
A Fog::Storage s3 connection
location
-
The location to check
151 152 153 |
# File 'lib/sluice/storage/s3.rb', line 151 def is_empty?(s3, location) list_files(s3, location).length <= 1 end |
.is_file?(path) ⇒ Boolean
Whether the given path is a file or not
Parameters:
path
-
S3 path in String form
Returns boolean
120 121 122 |
# File 'lib/sluice/storage/s3.rb', line 120 def is_file?(path) !is_folder?(path) end |
.is_folder?(path) ⇒ Boolean
Whether the given path is a directory or not
Parameters:
path
-
S3 path in String form
Returns boolean
108 109 110 111 |
# File 'lib/sluice/storage/s3.rb', line 108 def is_folder?(path) (path.end_with?('_$folder$') || # EMR-created path.end_with?('/')) end |
.list_files(s3, location) ⇒ Object
Return an array of all Fog::Storage::AWS::File’s
Parameters:
s3
-
A Fog::Storage s3 connection
location
-
The location to return files from
Returns array of Fog::Storage::AWS::File’s
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/sluice/storage/s3.rb', line 89 def list_files(s3, location) files_and_dirs = s3.directories.get(location.bucket, prefix: location.dir).files files = [] # Can't use a .select because of Ruby deep copy issues (array of non-POROs) files_and_dirs.each { |f| if is_file?(f.key) files << f.dup end } files end |
.move_files(s3, from_files_or_loc, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Moves files between S3 locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to move files from
to_location
-
S3Location to move files to
match_regex
-
a regex string to match the files to move
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
270 271 272 273 274 |
# File 'lib/sluice/storage/s3.rb', line 270 def move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) puts " moving #{describe_from(from_files_or_loc)} to #{to_location}" process_files(:move, s3, from_files_or_loc, match_regex, to_location, alter_filename_lambda, flatten) end |
.move_files_inter(from_s3, to_s3, from_location, to_location, match_regex = '.+', alter_filename_lambda = false, flatten = false) ⇒ Object
Moves files between S3 locations in two different accounts
Implementation is as follows:
-
Concurrent download of all files from S3 source to local tmpdir
-
Concurrent upload of all files from local tmpdir to S3 target
-
Concurrent deletion of all files from S3 source
In other words, the three operations are not interleaved (which is inefficient because upload speeds are much lower than download speeds)
from_s3
-
A Fog::Storage s3 connection for accessing the from S3Location
to_s3
-
A Fog::Storage s3 connection for accessing the to S3Location
from_location
-
S3Location to move files from
to_location
-
S3Location to move files to
match_regex
-
a regex string to match the files to move
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/sluice/storage/s3.rb', line 248 def move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=false, flatten=false) puts " moving inter-account #{describe_from(from_location)} to #{to_location}" Dir.mktmpdir do |t| tmp = Sluice::Storage.trail_slash(t) download_files(from_s3, from_location, tmp, match_regex) upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded delete_files(from_s3, from_location, '.+') # Delete all files we downloaded end end |
.name_file(filepath, new_filename, remove_path = nil, add_path = nil, flatten = false) ⇒ Object
A helper function to prepare destination filenames and paths. This is a bit weird
-
it needs to exist because of differences
in the way that Amazon S3, Fog and Unix treat filepaths versus keys.
Parameters:
filepath
-
Path to file (including old filename)
new_filename
-
Replace the filename in the path with this
remove_path
-
If this is set, strip this from the front of the path
add_path
-
If this is set, add this to the front of the path
flatten
-
strips off any sub-folders below the from_location
TODO: this really needs unit tests
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 |
# File 'lib/sluice/storage/s3.rb', line 621 def name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false) # First, replace the filename in filepath with new one dirname = File.dirname(filepath) new_filepath = (dirname == '.') ? new_filename : dirname + '/' + new_filename # Nothing more to do return new_filepath if remove_path.nil? and add_path.nil? and not flatten shortened_filepath = if flatten # Let's revert to just the filename new_filename else # If we have a 'remove_path', it must be found at # the start of the path. # If it's not, you're probably using name_file() # wrong. if !filepath.start_with?(remove_path) raise StorageOperationError, "name_file failed. Filepath '#{filepath}' does not start with '#{remove_path}'" end # Okay, let's remove the filepath new_filepath[remove_path.length()..-1] end # Nothing more to do return shortened_filepath if add_path.nil? # Add the new filepath on to the start and return return add_path + shortened_filepath end |
.new_fog_s3_from(region, access_key_id, secret_access_key) ⇒ Object
Helper function to instantiate a new Fog::Storage for S3 based on our config options
Parameters:
region
-
Amazon S3 region we will be working with
access_key_id
-
AWS access key ID
secret_access_key
-
AWS secret access key
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/sluice/storage/s3.rb', line 70 def new_fog_s3_from(region, access_key_id, secret_access_key) fog = Fog::Storage.new({ :provider => 'AWS', :region => region, :aws_access_key_id => access_key_id, :aws_secret_access_key => secret_access_key }) fog.sync_clock fog end |
.process_files(operation, s3, from_files_or_dir_or_loc, match_regex_or_glob = '.+', to_loc_or_dir = nil, alter_filename_lambda = false, flatten = false) ⇒ Object
Concurrent file operations between S3 locations. Supports:
-
Download
-
Upload
-
Copy
-
Delete
-
Move (= Copy + Delete)
Parameters:
operation
-
Operation to perform. :download, :upload, :copy, :delete, :move supported
s3
-
A Fog::Storage s3 connection
from_files_or_dir_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from
match_regex_or_glob
-
a regex or glob string to match the files to process
to_loc_or_dir
-
S3Location or local directory to process files to
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_loc_or_dir
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 |
# File 'lib/sluice/storage/s3.rb', line 366 def process_files(operation, s3, from_files_or_dir_or_loc, match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=false, flatten=false) # Validate that the file operation makes sense case operation when :copy, :move, :download, :upload if to_loc_or_dir.nil? raise StorageOperationError "File operation %s requires the to_loc_or_dir to be set" % operation end when :delete unless to_loc_or_dir.nil? raise StorageOperationError "File operation %s does not support the to_loc_or_dir argument" % operation end if alter_filename_lambda.class == Proc raise StorageOperationError "File operation %s does not support the alter_filename_lambda argument" % operation end else raise StorageOperationError "File operation %s is unsupported. Try :download, :upload, :copy, :delete or :move" % operation end # If we have an array of files, no additional globbing required if from_files_or_dir_or_loc.is_a?(Array) files_to_process = from_files_or_dir_or_loc # Could be filepaths or Fog::Storage::AWS::File's globbed = true # Otherwise if it's an upload, we can glob now elsif operation == :upload files_to_process = glob_files(from_files_or_dir_or_loc, match_regex_or_glob) globbed = true # Otherwise we'll do threaded globbing later... else files_to_process = [] from_loc = from_files_or_dir_or_loc # Alias globbed = false end threads = [] mutex = Mutex.new complete = false marker_opts = {} # If an exception is thrown in a thread that isn't handled, die quickly Thread.abort_on_exception = true # Create Ruby threads to concurrently execute s3 operations for i in (0...CONCURRENCY) # Each thread pops a file off the files_to_process array, and moves it. # We loop until there are no more files threads << Thread.new do loop do file = false filepath = false from_bucket = false from_path = false match = false # Critical section: # only allow one thread to modify the array at any time mutex.synchronize do # No need to do further globbing if globbed if files_to_process.size == 0 complete = true next end file = files_to_process.pop # Support raw filenames and also Fog::Storage::AWS::File's if (file.is_a?(Fog::Storage::AWS::File)) from_bucket = file.directory.key # Bucket from_path = Sluice::Storage.trail_slash(File.dirname(file.key)) filepath = file.key else from_bucket = nil # Not used if from_files_or_dir_or_loc.is_a?(Array) from_path = Sluice::Storage.trail_slash(File.dirname(file)) else from_path = from_files_or_dir_or_loc # The root dir end filepath = file end match = true # Match is implicit in the glob else while !complete && !match do if files_to_process.size == 0 # S3 batches 1000 files per request. # We load up our array with the files to move files_to_process = s3.directories.get(from_loc.bucket, :prefix => from_loc.dir).files.all(marker_opts) # If we don't have any files after the S3 request, we're complete if files_to_process.size == 0 complete = true next else marker_opts['marker'] = files_to_process.last.key # By reversing the array we can use pop and get FIFO behaviour # instead of the performance penalty incurred by unshift files_to_process = files_to_process.reverse end end file = files_to_process.pop from_bucket = from_loc.bucket from_path = from_loc.dir_as_path filepath = file.key match = if match_regex_or_glob.is_a? NegativeRegex !filepath.match(match_regex_or_glob.regex) else filepath.match(match_regex_or_glob) end end end end break unless match break if is_folder?(filepath) # Name file basename = get_basename(filepath) if alter_filename_lambda.class == Proc filename = alter_filename_lambda.call(basename) else filename = basename end # What are we doing? Let's determine source and target # Note that target excludes bucket name where relevant case operation when :upload source = "#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten) puts " UPLOAD #{source} +-> #{to_loc_or_dir.bucket}/#{target}" when :download source = "#{from_bucket}/#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir, flatten) puts " DOWNLOAD #{source} +-> #{target}" when :move source = "#{from_bucket}/#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten) puts " MOVE #{source} -> #{to_loc_or_dir.bucket}/#{target}" when :copy source = "#{from_bucket}/#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten) puts " COPY #{source} +-> #{to_loc_or_dir.bucket}/#{target}" when :delete source = "#{from_bucket}/#{filepath}" # No target puts " DELETE x #{source}" end # Upload is a standalone operation vs move/copy/delete if operation == :upload retry_x( Sluice::Storage::S3, [:upload_file, s3, filepath, to_loc_or_dir.bucket, target], RETRIES, " +/> #{target}", "Problem uploading #{filepath}. Retrying.") end # Download is a standalone operation vs move/copy/delete if operation == :download retry_x( Sluice::Storage::S3, [:download_file, s3, file, target], RETRIES, " +/> #{target}", "Problem downloading #{filepath}. Retrying.") end # A move or copy starts with a copy file if [:move, :copy].include? operation retry_x( file, [:copy, to_loc_or_dir.bucket, target], RETRIES, " +-> #{to_loc_or_dir.bucket}/#{target}", "Problem copying #{filepath}. Retrying.") end # A move or delete ends with a delete if [:move, :delete].include? operation retry_x( file, [:destroy], RETRIES, " x #{source}", "Problem destroying #{filepath}. Retrying.") end end end end # Wait for threads to finish threads.each { |aThread| aThread.join } end |
.retry_x(object, send_args, retries, attempt_msg, failure_msg) ⇒ Object
A helper function to attempt to run a function retries times
Parameters:
function
-
Function to run
retries
-
Number of retries to attempt
attempt_msg
-
Message to puts on each attempt
failure_msg
-
Message to puts on each failure
592 593 594 595 596 597 598 599 600 601 602 603 604 |
# File 'lib/sluice/storage/s3.rb', line 592 def retry_x(object, send_args, retries, attempt_msg, failure_msg) i = 0 begin object.send(*send_args) puts attempt_msg rescue raise unless i < retries puts failure_msg sleep(RETRY_WAIT) # Give us a bit of time before retrying i += 1 retry end end |
.upload_file(s3, from_file, to_bucket, to_file) ⇒ Object
Upload a single file to the exact location specified Has no intelligence around filenaming.
Parameters:
s3
-
A Fog::Storage s3 connection
- +from_file
-
A local file path
- +to_bucket
-
The Fog::Directory to upload to
- +to_file
-
The file path to upload to
299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/sluice/storage/s3.rb', line 299 def upload_file(s3, from_file, to_bucket, to_file) local_file = File.open(from_file) dir = s3.directories.new(:key => to_bucket) # No request made file = dir.files.create( :key => to_file, :body => local_file ) local_file.close end |
.upload_files(s3, from_files_or_dir, to_location, match_glob = '*') ⇒ Object
Uploads files to S3 locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_dir
-
Local array of files or local directory to upload files from
to_location
-
S3Location to upload files to
match_glob
-
a filesystem glob to match the files to upload
284 285 286 287 288 |
# File 'lib/sluice/storage/s3.rb', line 284 def upload_files(s3, from_files_or_dir, to_location, match_glob='*') puts " uploading #{describe_from(from_files_or_dir)} to #{to_location}" process_files(:upload, s3, from_files_or_dir, match_glob, to_location) end |