Class: IOStreams::Paths::S3

Inherits:
IOStreams::Path show all
Defined in:
lib/io_streams/paths/s3.rb

Constant Summary collapse

S3_COPY_OBJECT_SIZE_LIMIT =

Largest file size supported by the S3 copy object api.

5 * 1024 * 1024 * 1024

Instance Attribute Summary collapse

Attributes inherited from IOStreams::Path

#path

Attributes inherited from Stream

#io_stream

Instance Method Summary collapse

Methods inherited from IOStreams::Path

#<=>, #==, #absolute?, #children, #compressed?, #delete_all, #directory, #encrypted?, #inspect, #join, #realpath

Methods inherited from Stream

#basename, #dirname, #each, #extension, #extname, #file_name, #file_name=, #format, #format=, #format_options, #format_options=, #option, #option_or_stream, #pipeline, #read, #reader, #setting, #stream, #write, #writer

Constructor Details

#initialize(url, client: nil, access_key_id: nil, secret_access_key: nil, **args) ⇒ S3

Arguments:

url: [String]

Prefix must be: `s3://`
followed by bucket name,
followed by key.
Examples:
  s3://my-bucket-name/file_name.txt
  s3://my-bucket-name/some_path/file_name.csv

access_key_id: [String]

AWS Access Key Id to use to access this bucket.

secret_access_key: [String]

AWS Secret Access Key Id to use to access this bucket.

Writer specific options:

Parameters:

  • params (Hash)

    a customizable set of options



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/io_streams/paths/s3.rb', line 136

def initialize(url, client: nil, access_key_id: nil, secret_access_key: nil, **args)
  Utils.load_soft_dependency("aws-sdk-s3", "AWS S3") unless defined?(::Aws::S3::Client)

  uri = Utils::URI.new(url)
  raise "Invalid URI. Required Format: 's3://<bucket_name>/<key>'" unless uri.scheme == "s3"

  @bucket_name = uri.hostname
  key          = uri.path.sub(%r{\A/}, "")

  if client && !client.is_a?(Hash)
    @client = client
  else
    @client_options                     = client.is_a?(Hash) ? client.dup : {}
    @client_options[:access_key_id]     = access_key_id if access_key_id
    @client_options[:secret_access_key] = secret_access_key if secret_access_key
  end

  @options = args
  @options.merge!(uri.query.transform_keys(&:to_sym)) if uri.query

  super(key)
end

Instance Attribute Details

#bucket_nameObject (readonly)

Returns the value of attribute bucket_name.



6
7
8
# File 'lib/io_streams/paths/s3.rb', line 6

def bucket_name
  @bucket_name
end

#optionsObject (readonly)

Returns the value of attribute options.



6
7
8
# File 'lib/io_streams/paths/s3.rb', line 6

def options
  @options
end

Instance Method Details

#clientObject

Lazy load S3 client since it takes two seconds to create itself!



320
321
322
# File 'lib/io_streams/paths/s3.rb', line 320

def client
  @client ||= ::Aws::S3::Client.new(@client_options)
end

#copy_from(source_path, convert: true, **args) ⇒ Object

Make S3 perform direct copies within S3 itself.



206
207
208
209
210
211
212
213
214
215
216
# File 'lib/io_streams/paths/s3.rb', line 206

def copy_from(source_path, convert: true, **args)
  return super(source_path, convert: true, **args) if convert

  source = IOStreams.new(source_path)
  if !source.is_a?(self.class) || (source.size.to_i >= S3_COPY_OBJECT_SIZE_LIMIT)
    return super(source, convert: convert, **args)
  end

  source_name = ::File.join(source.bucket_name, source.path)
  client.copy_object(options.merge(bucket: bucket_name, key: path, copy_source: source_name))
end

#copy_to(target_path, convert: true, **args) ⇒ Object

Make S3 perform direct copies within S3 itself.



194
195
196
197
198
199
200
201
202
203
# File 'lib/io_streams/paths/s3.rb', line 194

def copy_to(target_path, convert: true, **args)
  return super(target_path, convert: convert, **args) if convert || (size.to_i >= S3_COPY_OBJECT_SIZE_LIMIT)

  target = IOStreams.new(target_path)
  return super(target, convert: convert, **args) unless target.is_a?(self.class)

  source_name = ::File.join(bucket_name, path)
  client.copy_object(options.merge(bucket: target.bucket_name, key: target.path, copy_source: source_name))
  target
end

#deleteObject



168
169
170
171
172
173
# File 'lib/io_streams/paths/s3.rb', line 168

def delete
  client.delete_object(bucket: bucket_name, key: path)
  self
rescue Aws::S3::Errors::NotFound
  self
end

#each_child(pattern = "*", case_sensitive: false, directories: false, hidden: false) ⇒ Object

Notes:

  • Currently all S3 lookups are recursive as of the pattern regardless of whether the pattern includes ‘**`.



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/io_streams/paths/s3.rb', line 286

def each_child(pattern = "*", case_sensitive: false, directories: false, hidden: false)
  matcher = Matcher.new(self, pattern, case_sensitive: case_sensitive, hidden: hidden)

  # When the pattern includes an exact file name without any pattern characters
  if matcher.pattern.nil?
    yield(matcher.path) if matcher.path.exist?
    return
  end

  prefix = Utils::URI.new(matcher.path.to_s).path.sub(%r{\A/}, "")
  token  = nil
  loop do
    # Fetches upto 1,000 entries at a time
    resp = client.list_objects_v2(bucket: bucket_name, prefix: prefix, continuation_token: token)
    resp.contents.each do |object|
      next if !directories && object.key.end_with?("/")

      file_name = ::File.join("s3://", resp.name, object.key)
      next unless matcher.match?(file_name)

      yield(self.class.new(file_name), object.to_h)
    end
    token = resp.next_continuation_token
    break if token.nil?
  end
  nil
end

#exist?Boolean

Returns:

  • (Boolean)


175
176
177
178
179
180
# File 'lib/io_streams/paths/s3.rb', line 175

def exist?
  client.head_object(bucket: bucket_name, key: path)
  true
rescue Aws::S3::Errors::NotFound
  false
end

#mkdirObject



223
224
225
# File 'lib/io_streams/paths/s3.rb', line 223

def mkdir
  self
end

#mkpathObject

S3 logically creates paths when a key is set.



219
220
221
# File 'lib/io_streams/paths/s3.rb', line 219

def mkpath
  self
end

#move_to(target_path) ⇒ Object

Moves this file to the ‘target_path` by copying it to the new name and then deleting the current file.

Notes:

  • Can copy across buckets.

  • No stream conversions are applied.



187
188
189
190
191
# File 'lib/io_streams/paths/s3.rb', line 187

def move_to(target_path)
  target = copy_to(target_path, convert: false)
  delete
  target
end

#partial_files_visible?Boolean

On S3 only files that are completely saved are visible.

Returns:

  • (Boolean)


315
316
317
# File 'lib/io_streams/paths/s3.rb', line 315

def partial_files_visible?
  false
end

#read_file(file_name) ⇒ Object

Shortcut method if caller has a filename already with no other streams applied:



246
247
248
249
250
# File 'lib/io_streams/paths/s3.rb', line 246

def read_file(file_name)
  ::File.open(file_name, "wb") do |file|
    client.get_object(options.merge(response_target: file, bucket: bucket_name, key: path))
  end
end

#relative?Boolean

Does not support relative file names since there is no concept of current working directory

Returns:

  • (Boolean)


164
165
166
# File 'lib/io_streams/paths/s3.rb', line 164

def relative?
  false
end

#sizeObject



227
228
229
230
231
# File 'lib/io_streams/paths/s3.rb', line 227

def size
  client.head_object(bucket: bucket_name, key: path).content_length
rescue Aws::S3::Errors::NotFound
  nil
end

#stream_reader(&block) ⇒ Object

Read from AWS S3 file.



236
237
238
239
240
241
242
243
# File 'lib/io_streams/paths/s3.rb', line 236

def stream_reader(&block)
  # Since S3 download only supports a push stream, write it to a tempfile first.
  Utils.temp_file_name("iostreams_s3") do |file_name|
    read_file(file_name)

    ::File.open(file_name, "rb") { |io| builder.reader(io, &block) }
  end
end

#stream_writer(&block) ⇒ Object

Write to AWS S3

Raises [MultipartUploadError] If an object is being uploaded in

parts, and the upload can not be completed, then the upload is
aborted and this error is raised.  The raised error has a `#errors`
method that returns the failures that caused the upload to be
aborted.


259
260
261
262
263
264
265
266
267
268
# File 'lib/io_streams/paths/s3.rb', line 259

def stream_writer(&block)
  # Since S3 upload only supports a pull stream, write it to a tempfile first.
  Utils.temp_file_name("iostreams_s3") do |file_name|
    result = ::File.open(file_name, "wb") { |io| builder.writer(io, &block) }

    # Upload file only once all data has been written to it
    write_file(file_name)
    result
  end
end

#to_sObject



159
160
161
# File 'lib/io_streams/paths/s3.rb', line 159

def to_s
  ::File.join("s3://", bucket_name, path)
end

#write_file(file_name) ⇒ Object

Shortcut method if caller has a filename already with no other streams applied:



271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/io_streams/paths/s3.rb', line 271

def write_file(file_name)
  if ::File.size(file_name) > 5 * 1024 * 1024
    # Use multipart file upload
    s3  = Aws::S3::Resource.new(client: client)
    obj = s3.bucket(bucket_name).object(path)
    obj.upload_file(file_name, options)
  else
    ::File.open(file_name, "rb") do |file|
      client.put_object(options.merge(bucket: bucket_name, key: path, body: file))
    end
  end
end