Class: Cnvrg::Storage

Inherits:
Object
  • Object
show all
Defined in:
lib/cnvrg/storage.rb

Instance Method Summary collapse

Constructor Details

#initialize(dataset: nil, project: nil, root_path: nil) ⇒ Storage

Returns a new instance of Storage.



3
4
5
6
7
# File 'lib/cnvrg/storage.rb', line 3

def initialize(dataset: nil, project: nil, root_path: nil)
  @element = dataset || project
  @root_path = root_path
  @client = @element.get_storage_client
end

Instance Method Details

#clone(commit: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/cnvrg/storage.rb', line 32

def clone(commit: nil)
  files_generator = Proc.new do |params|
    @element.get_clone_chunk(commit: commit, chunk_size: params[:limit], offset: params[:offset])
  end
  action = Proc.new do |storage, local|
    @client.safe_download(storage, local)
  end

  @stats = @element.get_stats
  progress = {size: @stats['commit_size'], title: "Clone Progress"}

  storage_action(files_generator: files_generator, action: action, progress: progress)
end

#do_parallelObject



105
106
107
108
109
110
111
112
# File 'lib/cnvrg/storage.rb', line 105

def do_parallel
  Parallel.each( -> { @files.empty? ? (@finished ? Parallel::Stop : sleep(1)) : @files.pop }, in_threads: get_chunks_size) do |file|
    if file == 1
      next
    end
    yield(file)
  end
end

#download_file_thread(file) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/cnvrg/storage.rb', line 114

def download_file_thread(file)
  return if file.blank?
  local_path = file['name']
  storage_path = file['path']
  (0..5).each do
    begin
    # @client.download(storage_path, "#{@root_path}/#{local_path}")
    break
    rescue => e
      log_error(action: "download #{local_path}", error: e.message)
    end
  end
end

#file_gen_thread(file_gen) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/cnvrg/storage.rb', line 87

def file_gen_thread(file_gen)
  offset = 0
  chunk_size = get_chunks_size
  while true
    files = file_gen.call(limit: chunk_size, offset: offset)
    break if files.blank?
    files.each{|f| @files.push(f)}
    offset += files.size
  end
  @finished = true
end

#file_gen_upload_thread(files_generator) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/cnvrg/storage.rb', line 61

def file_gen_upload_thread(files_generator)
  while true
    files = files_generator
    files.each{|f| @files.push(f)}
    break if files.blank?
  end
  @finished = true
end

#get_chunks_sizeObject



14
15
16
# File 'lib/cnvrg/storage.rb', line 14

def get_chunks_size
  (ENV['CNVRG_STORAGE_CHUNK_SIZE'] || 10).to_i
end

#handle_errorsObject



99
100
101
102
103
# File 'lib/cnvrg/storage.rb', line 99

def handle_errors
  if @storage_errors.present?
    File.open(@element.working_dir + "/.cnvrg/errors.yml", "w+"){|f| f.write @storage_errors.to_yaml}
  end
end

#init_progress_bar(size: nil, title: "Download Progress") ⇒ Object



18
19
20
21
22
23
24
25
# File 'lib/cnvrg/storage.rb', line 18

def init_progress_bar(size: nil, title: "Download Progress")
  @progressbar = ProgressBar.create(:title => title,
                                   :progress_mark => '=',
                                   :format => "%b>>%i| %p%% %t",
                                   :starting_at => 0,
                                   :total => size,
                                   :autofinish => true)
end

#log_error(action: nil, error: '') ⇒ Object



9
10
11
# File 'lib/cnvrg/storage.rb', line 9

def log_error(action: nil, error: '')
  "[#{Time.now}] (#{action || 'default'}) #{error}"
end

#make_progress(size: 1) ⇒ Object



27
28
29
# File 'lib/cnvrg/storage.rb', line 27

def make_progress(size: 1)
  @progressbar.progress += size
end

#storage_action(files_generator: nil, action: nil, progress: {size: 0, title: ''}) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/cnvrg/storage.rb', line 70

def storage_action(files_generator: nil, action: nil, progress: {size: 0, title: ''})
  ### the generator files should have {path (encrypted), name, size}
  init_progress_bar(progress)
  @storage_errors = []
  @finished = false
  @files = Queue.new
  t = Thread.new{file_gen_thread(files_generator)}
  do_parallel do |file|
    self.download_file_thread(file) do |local, storage|
      action.call(local, storage)
    end
    self.make_progress(size: file['size'])
  end
  t.join
  handle_errors
end

#upload_files(files_generator, progress: {size: 0, title: ''}) ⇒ Object



46
47
48
# File 'lib/cnvrg/storage.rb', line 46

def upload_files(commit: nil)

end