Class: Etna::Filesystem::Metis

Inherits:
Etna::Filesystem show all
Defined in:
lib/etna/filesystem.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Etna::Filesystem

#mv, #rm_rf, #stat, #tmpdir

Constructor Details

#initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid) ⇒ Metis

Returns a new instance of Metis.



276
277
278
279
280
281
282
# File 'lib/etna/filesystem.rb', line 276

def initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid)
  @metis_client = metis_client
  @project_name = project_name
  @bucket_name = bucket_name
  @root = root
  @metis_uid = uuid
end

Instance Attribute Details

#bucket_nameObject (readonly)

Returns the value of attribute bucket_name.



274
275
276
# File 'lib/etna/filesystem.rb', line 274

def bucket_name
  @bucket_name
end

#project_nameObject (readonly)

Returns the value of attribute project_name.



274
275
276
# File 'lib/etna/filesystem.rb', line 274

def project_name
  @project_name
end

Instance Method Details

#create_download_workflowObject



347
348
349
# File 'lib/etna/filesystem.rb', line 347

def create_download_workflow
  Etna::Clients::Metis::MetisDownloadWorkflow.new(metis_client: @metis_client, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#create_upload_workflowObject



289
290
291
# File 'lib/etna/filesystem.rb', line 289

def create_upload_workflow
  Etna::Clients::Metis::MetisUploadWorkflow.new(metis_client: @metis_client, metis_uid: @metis_uid, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#do_streaming_download(wp, metis_file) ⇒ Object



351
352
353
# File 'lib/etna/filesystem.rb', line 351

def do_streaming_download(wp, metis_file)
  create_download_workflow.do_download(wp, metis_file)
end

#do_streaming_upload(rp, dest, size_hint) ⇒ Object



333
334
335
336
337
338
339
# File 'lib/etna/filesystem.rb', line 333

def do_streaming_upload(rp, dest, size_hint)
  streaming_upload = Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload.new(readable_io: rp, size_hint: size_hint)
  create_upload_workflow.do_upload(
      streaming_upload,
      metis_path_of(dest)
  )
end

#exist?(src) ⇒ Boolean

Returns:

  • (Boolean)


382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/etna/filesystem.rb', line 382

def exist?(src)
  begin
    response = list_metis_directory(::File.dirname(src))
  rescue Etna::Error => e
    if e.status == 404
      return false
    elsif e.message =~ /Invalid folder/
      return false
    end

    raise e
  end

  response.files.all.any? { |f| f.file_name == ::File.basename(src) } ||
      response.folders.all.any? { |f| f.folder_name == ::File.basename(src) }
end

#list_metis_directory(path) ⇒ Object



364
365
366
# File 'lib/etna/filesystem.rb', line 364

def list_metis_directory(path)
  @metis_client.list_folder(Etna::Clients::Metis::ListFolderRequest.new(project_name: @project_name, bucket_name: @bucket_name, folder_path: metis_path_of(path)))
end

#ls(dir) ⇒ Object



377
378
379
380
# File 'lib/etna/filesystem.rb', line 377

def ls(dir)
  response = list_metis_directory(::File.dirname(dir))
  response.files.map { |f| [:file, f.file_name] } + response.folders.map { |f| [:folder, f.folder_name] }
end

#metis_path_of(path) ⇒ Object



284
285
286
287
# File 'lib/etna/filesystem.rb', line 284

def metis_path_of(path)
  joined = ::File.join(@root, path)
  joined[0] == "/" ? joined.slice(1..-1) : joined
end

#mkdir_p(dir) ⇒ Object



368
369
370
371
372
373
374
375
# File 'lib/etna/filesystem.rb', line 368

def mkdir_p(dir)
  create_folder_request = Etna::Clients::Metis::CreateFolderRequest.new(
      project_name: @project_name,
      bucket_name: @bucket_name,
      folder_path: metis_path_of(dir),
  )
  @metis_client.create_folder(create_folder_request)
end

#with_hot_pipe(opts, receiver, *args, &block) ⇒ Object



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/etna/filesystem.rb', line 293

def with_hot_pipe(opts, receiver, *args, &block)
  rp, wp = IO.pipe
  begin
    executor = Concurrent::SingleThreadExecutor.new(fallback_policy: :abort)
    begin
      if opts.include?('w')
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, rp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          rp.close
        end

        yield wp
      else
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, wp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          wp.close
        end

        yield rp
      end

      future.wait!
    ensure
      executor.shutdown
      executor.kill unless executor.wait_for_termination(5)
    end
  ensure
    rp.close
    wp.close
  end
end

#with_readable(src, opts = 'r', &block) ⇒ Object



355
356
357
358
359
360
361
362
# File 'lib/etna/filesystem.rb', line 355

def with_readable(src, opts = 'r', &block)
  metis_file = list_metis_directory(::File.dirname(src)).files.all.find { |f| f.file_name == ::File.basename(src) }
  raise "Metis file at #{@project_name}/#{@bucket_name}/#{@root}/#{src} not found.  No such file" if metis_file.nil?

  self.with_hot_pipe(opts, :do_streaming_download, metis_file) do |rp|
    yield rp
  end
end

#with_writeable(dest, opts = 'w', size_hint: nil, &block) ⇒ Object



341
342
343
344
345
# File 'lib/etna/filesystem.rb', line 341

def with_writeable(dest, opts = 'w', size_hint: nil, &block)
  self.with_hot_pipe(opts, :do_streaming_upload, dest, size_hint) do |wp|
    yield wp
  end
end