Class: Etna::Filesystem::Metis

Inherits:
Etna::Filesystem show all
Defined in:
lib/etna/filesystem.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Etna::Filesystem

#mv, #rm_rf, #stat, #tmpdir

Constructor Details

#initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid) ⇒ Metis

Returns a new instance of Metis.



256
257
258
259
260
261
262
# File 'lib/etna/filesystem.rb', line 256

def initialize(metis_client:, project_name:, bucket_name:, root: '/', uuid: SecureRandom.uuid)
  @metis_client = metis_client
  @project_name = project_name
  @bucket_name = bucket_name
  @root = root
  @metis_uid = uuid
end

Instance Attribute Details

#bucket_nameObject (readonly)

Returns the value of attribute bucket_name.



254
255
256
# File 'lib/etna/filesystem.rb', line 254

def bucket_name
  @bucket_name
end

#project_nameObject (readonly)

Returns the value of attribute project_name.



254
255
256
# File 'lib/etna/filesystem.rb', line 254

def project_name
  @project_name
end

Instance Method Details

#create_download_workflowObject



327
328
329
# File 'lib/etna/filesystem.rb', line 327

def create_download_workflow
  Etna::Clients::Metis::MetisDownloadWorkflow.new(metis_client: @metis_client, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#create_upload_workflowObject



269
270
271
# File 'lib/etna/filesystem.rb', line 269

def create_upload_workflow
  Etna::Clients::Metis::MetisUploadWorkflow.new(metis_client: @metis_client, metis_uid: @metis_uid, project_name: @project_name, bucket_name: @bucket_name, max_attempts: 3)
end

#do_streaming_download(wp, metis_file) ⇒ Object



331
332
333
# File 'lib/etna/filesystem.rb', line 331

def do_streaming_download(wp, metis_file)
  create_download_workflow.do_download(wp, metis_file)
end

#do_streaming_upload(rp, dest, size_hint) ⇒ Object



313
314
315
316
317
318
319
# File 'lib/etna/filesystem.rb', line 313

def do_streaming_upload(rp, dest, size_hint)
  streaming_upload = Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload.new(readable_io: rp, size_hint: size_hint)
  create_upload_workflow.do_upload(
      streaming_upload,
      metis_path_of(dest)
  )
end

#exist?(src) ⇒ Boolean

Returns:

  • (Boolean)


362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/etna/filesystem.rb', line 362

def exist?(src)
  begin
    response = list_metis_directory(::File.dirname(src))
  rescue Etna::Error => e
    if e.status == 404
      return false
    elsif e.message =~ /Invalid folder/
      return false
    end

    raise e
  end

  response.files.all.any? { |f| f.file_name == ::File.basename(src) } ||
      response.folders.all.any? { |f| f.folder_name == ::File.basename(src) }
end

#list_metis_directory(path) ⇒ Object



344
345
346
# File 'lib/etna/filesystem.rb', line 344

def list_metis_directory(path)
  @metis_client.list_folder(Etna::Clients::Metis::ListFolderRequest.new(project_name: @project_name, bucket_name: @bucket_name, folder_path: metis_path_of(path)))
end

#ls(dir) ⇒ Object



357
358
359
360
# File 'lib/etna/filesystem.rb', line 357

def ls(dir)
  response = list_metis_directory(::File.dirname(dir))
  response.files.map { |f| [:file, f.file_name] } + response.folders.map { |f| [:folder, f.folder_name] }
end

#metis_path_of(path) ⇒ Object



264
265
266
267
# File 'lib/etna/filesystem.rb', line 264

def metis_path_of(path)
  joined = ::File.join(@root, path)
  joined[0] == "/" ? joined.slice(1..-1) : joined
end

#mkdir_p(dir) ⇒ Object



348
349
350
351
352
353
354
355
# File 'lib/etna/filesystem.rb', line 348

def mkdir_p(dir)
  create_folder_request = Etna::Clients::Metis::CreateFolderRequest.new(
      project_name: @project_name,
      bucket_name: @bucket_name,
      folder_path: metis_path_of(dir),
  )
  @metis_client.create_folder(create_folder_request)
end

#with_hot_pipe(opts, receiver, *args, &block) ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/etna/filesystem.rb', line 273

def with_hot_pipe(opts, receiver, *args, &block)
  rp, wp = IO.pipe
  begin
    executor = Concurrent::SingleThreadExecutor.new(fallback_policy: :abort)
    begin
      if opts.include?('w')
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, rp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          rp.close
        end

        yield wp
      else
        future = Concurrent::Promises.future_on(executor) do
          self.send(receiver, wp, *args)
        rescue => e
          Etna::Application.instance.logger.log_error(e)
          raise e
        ensure
          wp.close
        end

        yield rp
      end

      future.wait!
    ensure
      executor.shutdown
      executor.kill unless executor.wait_for_termination(5)
    end
  ensure
    rp.close
    wp.close
  end
end

#with_readable(src, opts = 'r', &block) ⇒ Object



335
336
337
338
339
340
341
342
# File 'lib/etna/filesystem.rb', line 335

def with_readable(src, opts = 'r', &block)
  metis_file = list_metis_directory(::File.dirname(src)).files.all.find { |f| f.file_name == ::File.basename(src) }
  raise "Metis file at #{@project_name}/#{@bucket_name}/#{@root}/#{src} not found.  No such file" if metis_file.nil?

  self.with_hot_pipe(opts, :do_streaming_download, metis_file) do |rp|
    yield rp
  end
end

#with_writeable(dest, opts = 'w', size_hint: nil, &block) ⇒ Object



321
322
323
324
325
# File 'lib/etna/filesystem.rb', line 321

def with_writeable(dest, opts = 'w', size_hint: nil, &block)
  self.with_hot_pipe(opts, :do_streaming_upload, dest, size_hint) do |wp|
    yield wp
  end
end