Class: Etna::Filesystem::Metis

Inherits:
Etna::Filesystem show all
Defined in:
lib/etna/filesystem.rb

Instance Method Summary collapse

Methods inherited from Etna::Filesystem

#mv, #rm_rf, #stat, #tmpdir

Constructor Details

#initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid) ⇒ Metis



255
256
257
258
259
260
261
# File 'lib/etna/filesystem.rb', line 255

def initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid)
  @metis_client = metis_client
  @project_name = project_name
  @bucket_name = bucket_name
  @root = root
  @metis_uid = uuid
end

Instance Method Details

#create_download_workflowObject



326
327
328
# File 'lib/etna/filesystem.rb', line 326

def create_download_workflow
  Etna::Clients::Metis::MetisDownloadWorkflow.new(metis_client: @metis_client, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#create_upload_workflowObject



268
269
270
# File 'lib/etna/filesystem.rb', line 268

def create_upload_workflow
  Etna::Clients::Metis::MetisUploadWorkflow.new(metis_client: @metis_client, metis_uid: @metis_uid, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#do_streaming_download(wp, metis_file) ⇒ Object



330
331
332
# File 'lib/etna/filesystem.rb', line 330

def do_streaming_download(wp, metis_file)
  create_download_workflow.do_download(wp, metis_file)
end

#do_streaming_upload(rp, dest, size_hint) ⇒ Object



312
313
314
315
316
317
318
# File 'lib/etna/filesystem.rb', line 312

def do_streaming_upload(rp, dest, size_hint)
  streaming_upload = Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload.new(readable_io: rp, size_hint: size_hint)
  create_upload_workflow.do_upload(
      streaming_upload,
      metis_path_of(dest)
  )
end

#exist?(src) ⇒ Boolean



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/etna/filesystem.rb', line 361

def exist?(src)
  begin
    response = list_metis_directory(::File.dirname(src))
  rescue Etna::Error => e
    if e.status == 404
      return false
    elsif e.message =~ /Invalid folder/
      return false
    end

    raise e
  end

  response.files.all.any? { |f| f.file_name == ::File.basename(src) } ||
      response.folders.all.any? { |f| f.folder_name == ::File.basename(src) }
end

#list_metis_directory(path) ⇒ Object



343
344
345
# File 'lib/etna/filesystem.rb', line 343

def list_metis_directory(path)
  @metis_client.list_folder(Etna::Clients::Metis::ListFolderRequest.new(project_name: @project_name, bucket_name: @bucket_name, folder_path: metis_path_of(path)))
end

#ls(dir) ⇒ Object



356
357
358
359
# File 'lib/etna/filesystem.rb', line 356

def ls(dir)
  response = list_metis_directory(::File.dirname(dir))
  response.files.map { |f| [:file, f.file_name] } + response.folders.map { |f| [:folder, f.folder_name] }
end

#metis_path_of(path) ⇒ Object



263
264
265
266
# File 'lib/etna/filesystem.rb', line 263

def metis_path_of(path)
  joined = ::File.join(@root, path)
  joined[0] == "/" ? joined.slice(1..-1) : joined
end

#mkdir_p(dir) ⇒ Object



347
348
349
350
351
352
353
354
# File 'lib/etna/filesystem.rb', line 347

def mkdir_p(dir)
  create_folder_request = Etna::Clients::Metis::CreateFolderRequest.new(
      project_name: @project_name,
      bucket_name: @bucket_name,
      folder_path: metis_path_of(dir),
  )
  @metis_client.create_folder(create_folder_request)
end

#with_hot_pipe(opts, receiver, *args, &block) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/etna/filesystem.rb', line 272

def with_hot_pipe(opts, receiver, *args, &block)
  rp, wp = IO.pipe
  begin
    executor = Concurrent::SingleThreadExecutor.new(fallback_policy: :abort)
    begin
      if opts.include?('w')
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, rp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          rp.close
        end

        yield wp
      else
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, wp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          wp.close
        end

        yield rp
      end

      future.wait!
    ensure
      executor.shutdown
      executor.kill unless executor.wait_for_termination(5)
    end
  ensure
    rp.close
    wp.close
  end
end

#with_readable(src, opts = 'r', &block) ⇒ Object



334
335
336
337
338
339
340
341
# File 'lib/etna/filesystem.rb', line 334

def with_readable(src, opts = 'r', &block)
  metis_file = list_metis_directory(::File.dirname(src)).files.all.find { |f| f.file_name == ::File.basename(src) }
  raise "Metis file at #{@project_name}/#{@bucket_name}/#{@root}/#{src} not found.  No such file" if metis_file.nil?

  self.with_hot_pipe(opts, :do_streaming_download, metis_file) do |rp|
    yield rp
  end
end

#with_writeable(dest, opts = 'w', size_hint: nil, &block) ⇒ Object



320
321
322
323
324
# File 'lib/etna/filesystem.rb', line 320

def with_writeable(dest, opts = 'w', size_hint: nil, &block)
  self.with_hot_pipe(opts, :do_streaming_upload, dest, size_hint) do |wp|
    yield wp
  end
end