Class: Etna::Filesystem::Metis

Inherits:
Etna::Filesystem show all
Defined in:
lib/etna/filesystem.rb

Instance Method Summary collapse

Methods inherited from Etna::Filesystem

#mv, #rm_rf, #stat, #tmpdir

Constructor Details

#initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid) ⇒ Metis

Returns a new instance of Metis.



254
255
256
257
258
259
260
# File 'lib/etna/filesystem.rb', line 254

def initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid)
  @metis_client = metis_client
  @project_name = project_name
  @bucket_name = bucket_name
  @root = root
  @metis_uid = uuid
end

Instance Method Details

#create_download_workflowObject



325
326
327
# File 'lib/etna/filesystem.rb', line 325

def create_download_workflow
  Etna::Clients::Metis::MetisDownloadWorkflow.new(metis_client: @metis_client, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#create_upload_workflowObject



267
268
269
# File 'lib/etna/filesystem.rb', line 267

def create_upload_workflow
  Etna::Clients::Metis::MetisUploadWorkflow.new(metis_client: @metis_client, metis_uid: @metis_uid, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#do_streaming_download(wp, metis_file) ⇒ Object



329
330
331
# File 'lib/etna/filesystem.rb', line 329

def do_streaming_download(wp, metis_file)
  create_download_workflow.do_download(wp, metis_file)
end

#do_streaming_upload(rp, dest, size_hint) ⇒ Object



311
312
313
314
315
316
317
# File 'lib/etna/filesystem.rb', line 311

def do_streaming_upload(rp, dest, size_hint)
  streaming_upload = Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload.new(readable_io: rp, size_hint: size_hint)
  create_upload_workflow.do_upload(
      streaming_upload,
      metis_path_of(dest)
  )
end

#exist?(src) ⇒ Boolean

Returns:

  • (Boolean)


360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# File 'lib/etna/filesystem.rb', line 360

def exist?(src)
  begin
    response = list_metis_directory(::File.dirname(src))
  rescue Etna::Error => e
    if e.status == 404
      return false
    elsif e.message =~ /Invalid folder/
      return false
    end

    raise e
  end

  response.files.all.any? { |f| f.file_name == ::File.basename(src) } ||
      response.folders.all.any? { |f| f.folder_name == ::File.basename(src) }
end

#list_metis_directory(path) ⇒ Object



342
343
344
# File 'lib/etna/filesystem.rb', line 342

def list_metis_directory(path)
  @metis_client.list_folder(Etna::Clients::Metis::ListFolderRequest.new(project_name: @project_name, bucket_name: @bucket_name, folder_path: metis_path_of(path)))
end

#ls(dir) ⇒ Object



355
356
357
358
# File 'lib/etna/filesystem.rb', line 355

def ls(dir)
  response = list_metis_directory(::File.dirname(dir))
  response.files.map { |f| [:file, f.file_name] } + response.folders.map { |f| [:folder, f.folder_name] }
end

#metis_path_of(path) ⇒ Object



262
263
264
265
# File 'lib/etna/filesystem.rb', line 262

def metis_path_of(path)
  joined = ::File.join(@root, path)
  joined[0] == "/" ? joined.slice(1..-1) : joined
end

#mkdir_p(dir) ⇒ Object



346
347
348
349
350
351
352
353
# File 'lib/etna/filesystem.rb', line 346

def mkdir_p(dir)
  create_folder_request = Etna::Clients::Metis::CreateFolderRequest.new(
      project_name: @project_name,
      bucket_name: @bucket_name,
      folder_path: metis_path_of(dir),
  )
  @metis_client.create_folder(create_folder_request)
end

#with_hot_pipe(opts, receiver, *args, &block) ⇒ Object



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/etna/filesystem.rb', line 271

def with_hot_pipe(opts, receiver, *args, &block)
  rp, wp = IO.pipe
  begin
    executor = Concurrent::SingleThreadExecutor.new(fallback_policy: :abort)
    begin
      if opts.include?('w')
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, rp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          rp.close
        end

        yield wp
      else
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, wp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          wp.close
        end

        yield rp
      end

      future.wait!
    ensure
      executor.shutdown
      executor.kill unless executor.wait_for_termination(5)
    end
  ensure
    rp.close
    wp.close
  end
end

#with_readable(src, opts = 'r', &block) ⇒ Object



333
334
335
336
337
338
339
340
# File 'lib/etna/filesystem.rb', line 333

def with_readable(src, opts = 'r', &block)
  metis_file = list_metis_directory(::File.dirname(src)).files.all.find { |f| f.file_name == ::File.basename(src) }
  raise "Metis file at #{@project_name}/#{@bucket_name}/#{@root}/#{src} not found.  No such file" if metis_file.nil?

  self.with_hot_pipe(opts, :do_streaming_download, metis_file) do |rp|
    yield rp
  end
end

#with_writeable(dest, opts = 'w', size_hint: nil, &block) ⇒ Object



319
320
321
322
323
# File 'lib/etna/filesystem.rb', line 319

def with_writeable(dest, opts = 'w', size_hint: nil, &block)
  self.with_hot_pipe(opts, :do_streaming_upload, dest, size_hint) do |wp|
    yield wp
  end
end