Class: Bio::BaseSpace::MultipartUpload

Inherits:
Object
  • Object
show all
Defined in:
lib/basespace/model/multipart_upload.rb

Overview

Multipart file upload class.

TODO This file is not yet ported as the multipartFileUpload class is just mentioned in the comment section of the BaseSpaceAPI file.

Instance Method Summary collapse

Constructor Details

#initialize(api, a_id, local_file, file_object, cpu_count, part_size, temp_dir, verbose) ⇒ MultipartUpload

Returns a new instance of MultipartUpload.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/basespace/model/multipart_upload.rb', line 136

def initialize(api, a_id, local_file, file_object, cpu_count, part_size, temp_dir, verbose)
  @api            = api
  @analysis_id    = a_id
  @local_file     = local_file
  @remote_file    = file_object
  @part_size      = part_size
  @cpu_count      = cpu_count
  @verbose        = verbose
  @temp_dir       = temp_dir
  @status         = 'Initialized'
  @start_time     = -1
  #@repeat_count  = 0             # number of chunks we uploaded multiple times
  setup
end

Instance Method Details

#__cleanUp__Object



207
208
209
# File 'lib/basespace/model/multipart_upload.rb', line 207

def __cleanUp__
  self.stats[0] += 1
end

#finalizeObject



239
240
241
242
243
244
245
246
247
# File 'lib/basespace/model/multipart_upload.rb', line 239

def finalize
  raise Exception('Cannot finalize a transfer with running threads.') if self.getRunningThreadCount()
  if self.Status=='Running'
    # code here for 
    self.Status=='Completed'
  else
    raise Exception('To finalize the status of the transfer must be "Running."')
  end
end

#getFileResponseObject



268
269
270
# File 'lib/basespace/model/multipart_upload.rb', line 268

def getFileResponse
  return self.remoteFile
end

#getProgressRatioObject



295
296
297
298
299
# File 'lib/basespace/model/multipart_upload.rb', line 295

def getProgressRatio
  currentQ = float(self.tasks.qsize() - len(self.consumers))
  # NOTE Python sublist [:6] already ported to Ruby [0..5]
  return str(float(self.totalTask - currentQ) / self.totalTask)[0..5]
end

#getRunningThreadCountObject



272
273
274
# File 'lib/basespace/model/multipart_upload.rb', line 272

def getRunningThreadCount
  return sum(self.consumers.map { |c| c.is_alive() })
end

#getRunningTimeObject



282
283
284
285
286
287
288
# File 'lib/basespace/model/multipart_upload.rb', line 282

def getRunningTime
  if self.StartTime==-1
    return 0
  else
    return time.time() - self.StartTime
  end
end

#getStatusObject



264
265
266
# File 'lib/basespace/model/multipart_upload.rb', line 264

def getStatus
  return self.Status
end

#getTotalTransferedObject

Returns the total data amoun transfered in Gb



291
292
293
# File 'lib/basespace/model/multipart_upload.rb', line 291

def getTotalTransfered
  return float((self.totalTask - self.tasks.qsize())*self.partSize) / 1000.0
end

#getTransRateObject



276
277
278
279
280
# File 'lib/basespace/model/multipart_upload.rb', line 276

def getTransRate
  # tasks completed                        size of file-parts 
  # NOTE Python sublist [:6] already ported to Ruby [0..5]
  return str((self.totalTask - self.tasks.qsize())*self.partSize/self.getRunningTime())[0..5] + ' mb/s'
end

#haltUploadObject



259
260
261
262
# File 'lib/basespace/model/multipart_upload.rb', line 259

def haltUpload
  self.consumers.each { |c| c.terminate() }
  self.Status = 'Terminated'
end

#hasFinishedObject



249
250
251
252
# File 'lib/basespace/model/multipart_upload.rb', line 249

def hasFinished
  return 0 if self.Status == 'Initialized'
  return !(self.getRunningThreadCount() > 0)
end

#pauseUploadObject



254
255
256
257
# File 'lib/basespace/model/multipart_upload.rb', line 254

def pauseUpload
  self.pauseEvent.set()
  self.Status = 'Paused'
end

#runObject



166
167
168
169
170
# File 'lib/basespace/model/multipart_upload.rb', line 166

def run
  while @status == 'Paused' or __check_queue__
    time.sleep(self.wait)
  end
end

#setupObject



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/basespace/model/multipart_upload.rb', line 172

def setup
  # determine the 
  totalSize = os.path.getsize(self.localFile)
  fileCount = int(math.ceil(totalSize/(self.partSize*1024.0*1000)))
  
  if self.verbose 
    print "TotalSize " + str(totalSize)
    print "Using split size " + str(self.partSize) +"Mb"
    print "Filecount " + str(fileCount)
    print "CPUs " + str(self.cpuCount)
  end
  
  # Establish communication queues
  self.tasks = multiprocessing.JoinableQueue()
  self.completedPool = multiprocessing.Queue()
  [1..fileCount].each { |i|         # set up the task queue
    t = uploadTask(self.api,self.remoteFile.Id, i, fileCount, self.localFile, 0)
    self.tasks.put(t)
  }
  self.totalTask  = self.tasks.qsize()
  
  # create consumers
  self.pauseEvent = multiprocessing.Event()
  self.haltEvent = multiprocessing.Event()
  if self.verbose
    print 'Creating %d consumers' % self.cpuCount
    print "queue size " + str(self.tasks.qsize())
  end
  # NOTE Original code -- note the strange indent. Variables i and c not used. Buggy code?
  # self.consumers = [ Consumer(self.tasks, self.completedPool,self.pauseEvent,self.haltEvent) for i in xrange(self.cpuCount) ]
  #   for c in self.consumers: self.tasks.put(None)   # add poisson pill
  self.consumers = [0..self.cpuCount].map { |i| Consumer(self.tasks, self.completedPool, self.pauseEvent, self.haltEvent) }
  self.consumers.each { |c| self.tasks.put(nil) }
end

#startUpload(returnOnFinish = 0, testInterval = 5) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/basespace/model/multipart_upload.rb', line 211

def startUpload(returnOnFinish = 0, testInterval = 5)
  raise Exception('Cannot resume a ' + self.Status + ' multi-part upload session.') if self.Status == 'Terminated' or self.Status == 'Completed'
      
  if self.Status == 'Initialized'
    self.StartTime = time.time()
    self.consumers.each { |w|
      w.start()
    }
  elsif self.Status == 'Paused'
    self.pauseEvent.clear()
  end
  self.Status = 'Running'
      
  # If returnOnFinish is set 
  if returnOnFinish
    i=0
    while not self.hasFinished()
      print str(i) + ': ' + str(self) if self.verbose and i
      time.sleep(testInterval)
      i+=1
    end
    self.finalize()
    return 1
  else
    return 1
  end
end

#to_sObject



151
152
153
154
155
156
157
158
159
160
# File 'lib/basespace/model/multipart_upload.rb', line 151

def to_s
  # TODO fix this.
  # NOTE Python sublist notation [:5] already changed to Ruby [0..4]
  return "MPU -  Stat: " + @status +  ", LiveThread: " + str(self.getRunningThreadCount()) + \
              ", RunTime: " + str(self.getRunningTime())[0..4] + 's' + \
              ", Q-size " + str(self.tasks.qsize()) + \
              ", Completed " + str(self.getProgressRatio()) + \
              ", AVG TransferRate " + self.getTransRate() + \
              ", Data transfered " + str(self.getTotalTransfered())[0..4] + 'Gb'
end

#to_strObject



162
163
164
# File 'lib/basespace/model/multipart_upload.rb', line 162

def to_str
  return self.inspect
end