Class: Seqtrim

Inherits:
Object
  • Object
show all
Defined in:
lib/seqtrimnext/classes/seqtrim.rb

Overview

SEQTRIM_VERSION_REVISION=27 SEQTRIM_VERSION_STAGE = ‘b’ $SEQTRIM_VERSION = “2.0.0#SEQTRIM_VERSION_STAGE#SEQTRIM_VERSION_REVISION”

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Seqtrim

Returns a new instance of Seqtrim.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/seqtrimnext/classes/seqtrim.rb', line 157

def initialize(options)
  # ,options[:fasta],options[:qual],,,,
  params_path=options[:template]
  
  ip=options[:server_ip]
  port=options[:port]
  workers=options[:workers]
  only_workers=options[:only_workers]
  chunk_size = options[:chunk_size]
  use_json = options[:json]
  
  # check for checkpoint
  
  if File.exists?(ScbiMapreduce::CHECKPOINT_FILE)
    if !options[:use_checkpoint]
      STDERR.puts "ERROR: A checkpoint file exists, either delete it or provide -C flag to use it"
      exit(-1)
    end
  end
  
     
  
  # it is the server part
if !only_workers then

  cd_hit_input_file = nil
  
  # TODO - FIX seqtrim to not iterate two times over input, so STDIN can be used
  sequence_readers=[]

  # open sequence reader and expand input files paths
  if options[:fastq]
    
    # choose fastq quality format
    format=:sanger
    
    case options[:format]
    when 'sanger'
      format = :sanger
    when 'illumina15'
      format = :ilumina
    when 'illumina18'
      format = :sanger
    end
    
    seqs_path=''
    
    $LOG.info("Used FastQ format for input files: #{format}")
    # iterate files
    options[:fastq].each do |fastq_file|
      
      if fastq_file=='-'
        seqs_path = STDIN
      else
        seqs_path = File.expand_path(fastq_file)
      end
      
      sequence_readers << FastqFile.new(seqs_path,'r',format, true)
      
    end
    
    cd_hit_input_file = seqs_path
    
  else

    seqs_path = File.expand_path(options[:fasta])
    cd_hit_input_file = seqs_path
    
    qual_path =  File.expand_path(options[:qual]) if qual_path
    sequence_readers << FastaQualFile.new(options[:fasta],options[:qual],true)

  end

 
  $LOG.info "Loading params"
  # Reads the parameter's file
  params = Params.new(params_path)

  $LOG.info "Checking global params"
  if !check_global_params(params)
  		exit(-1)
  end
                                 
  # Load actions
  $LOG.info "Loading actions"
  action_manager = ActionManager.new()

# load plugins 
  plugin_list = params.get_param('plugin_list') # puts in plugin_list the plugins's array
  $LOG.info "Loading plugins [#{plugin_list}]"    
  
  
  plugin_manager = PluginManager.new(plugin_list,params) # creates an instance from PluginManager. This must storage the plugins and load it
   
   
   
# load plugin params
  $LOG.info "Check plugin params"
  if !plugin_manager.check_plugins_params(params) then
   	$LOG.error "Plugin check failed"

   	# save used params to file
      params.save_file('used_params.txt')
      exit(-1)
  end
  
  if !Dir.exists?(OUTPUT_PATH)
    Dir.mkdir(OUTPUT_PATH)
  end

  # Extract global stats
  if params.get_param('generate_initial_stats').to_s=='true'
    $LOG.info "Calculatings stats"
    ExtractStats.new(sequence_readers,params)
  else
    $LOG.info "Skipping calculatings stats phase."
  end
  
  
  # save used params to file
  params.save_file(File.join(OUTPUT_PATH,'used_params.txt'))
  
  piro_on = (params.get_param('next_generation_sequences').to_s=='true')

    params.load_mids(params.get_param('mids_db'))
    params.load_ab_adapters(params.get_param('adapters_ab_db'))
    params.load_adapters(params.get_param('adapters_db'))
    params.load_linkers(params.get_param('linkers_db'))
    
    #execute cd-hit
    if params.get_param('remove_clonality').to_s=='true'
      cmd=get_custom_cdhit(cd_hit_input_file,params)
      if cmd.empty?
        cmd=get_cd_hit_cmd(cd_hit_input_file,workers,$SEQTRIMNEXT_INIT)
      end
      
      $LOG.info "Executing cd-hit-454: #{cmd}"
      
      if !File.exists?('clusters.fasta.clstr')
		  system(cmd)
      end
      
      if File.exists?('clusters.fasta.clstr')
       params.load_repeated_seqs('clusters.fasta.clstr')
      else
        $LOG.error("Exiting due to not found clusters.fasta.clstr. Maybe cd-hit failed. Check cd-hit.out")
        exit(-1)
      end
   end
    
	
############ SCBI DRB ###########
#			port = 50000
#			ip = "10.250.255.6"
#			port = 50000
#			ip = "localhost"
#
#			workers=20
#			only_workers=false
		# launch work manager
	

end # end only_workers

	custom_worker_file = File.join(File.dirname(__FILE__), 'em_classes','seqtrim_worker.rb')
    
	$LOG.info "Workers:\n#{workers}"
	
    if only_workers then
      
      worker_launcher = ScbiMapreduce::WorkerLauncher.new(ip,port, workers, custom_worker_file, STDOUT)
      worker_launcher.launch_workers_and_wait
    else
			$LOG.info 'Starting server'
    	        
			SeqtrimWorkManager.init_work_manager(sequence_readers, params,chunk_size,use_json,options[:skip_output],options[:write_in_gzip])
		
      begin
				cpus=1
		
				if RUBY_PLATFORM.downcase.include?("darwin")
          cpus=`hwprefs -cpu_count`.chomp.to_i
			  else
			    cpus=`grep processor /proc/cpuinfo |wc -l`.chomp.to_i
		    end
      rescue
        cpus=1
      end
		
      # if workers is an integer, reduce it by one (because of the server)
		begin
		  Integer(workers)
		  if workers>1 && workers<cpus
		    workers-=1
	    end
	  rescue
		  if workers.count>1 && workers.count<cpus
	      workers.shift
	    end
    end
		
		# launch processor server passing the ip, port and all required params
      # server = Server.new(ip,port, workers, SeqtrimWorkManager,custom_worker_file, STDOUT,File.join($SEQTRIM_PATH,'init_env'))
      # server = ScbiMapreduce::Manager.new(ip,port, workers, SeqtrimWorkManager,custom_worker_file, STDOUT,'~/.seqtrimnext')
		server = ScbiMapreduce::Manager.new(ip,port, workers, SeqtrimWorkManager,custom_worker_file, STDOUT,$SEQTRIMNEXT_INIT)
		server.chunk_size=chunk_size
      server.checkpointing=true
      server.keep_order=true
      server.retry_stuck_jobs=true
		server.start_server
      
      
      # close sequence reader
		sequence_readers.each do |file|
		  file.close
		end

      if SeqtrimWorkManager.exit_status>=0
		  $LOG.info "Exit status: #{SeqtrimWorkManager.exit_status}"
      else
        $LOG.error "Exit status: #{SeqtrimWorkManager.exit_status}"
      end
		$LOG.info 'Closing server'
	end
	
	############ SCBI DRB ###########

end

Class Method Details

.exit_statusObject



20
21
22
# File 'lib/seqtrimnext/classes/seqtrim.rb', line 20

def self.exit_status
  return SeqtrimWorkManager.exit_status
end

Instance Method Details

#check_global_params(params) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/seqtrimnext/classes/seqtrim.rb', line 100

def check_global_params(params)
 errors=[]
 
  # check plugin list
  comment='Plugins applied to every sequence, separated by commas. Order is important'
 # default_value='PluginLowHighSize,PluginMids,PluginIndeterminations,PluginAbAdapters,PluginContaminants,PluginLinker,PluginVectors,PluginLowQuality'
#    params.check_param(errors,'plugin_list','String',default_value,comment)
  params.check_param(errors,'plugin_list','PluginList',nil,comment)

  
  comment='Should SeqTrimNext analysis be based on NGS? (if setting to false, a classic Sanger sequencing is considered)'
 default_value='true'
 params.check_param(errors,'next_generation_sequences','String',default_value,comment)

  
  comment='Remove duplicated (clonal) sequences (using CD-HIT 454)'
 default_value='true'
 params.check_param(errors,'remove_clonality','String',default_value,comment)

  comment='Custom parameters used by CD-HIT-454 (leave empty to let seqtrimnext decide). Execute "cd-hit-454 help" in command line to see a list of parameters'
 default_value=''
 params.check_param(errors,'cdhit_custom_parameters','String',default_value,comment)

  comment='Generate initial stats'
 default_value='true'
 params.check_param(errors,'generate_initial_stats','String',default_value,comment)

comment='Minimum insert size for every trimmed sequence'
default_value = 40
params.check_param(errors,'min_insert_size_trimmed','Integer',default_value,comment)

comment='Minimum insert size for each end of paired-end reads; true paired-ends have both single-ends longer than this value'
default_value = 40
params.check_param(errors,'min_insert_size_paired','Integer',default_value,comment)


comment='Do not reject unexpectedly long sequences found in the raw data'
default_value='true'
params.check_param(errors,'accept_very_long_sequences','String',default_value,comment)

comment='Seqtrim version'
default_value=Seqtrimnext::SEQTRIM_VERSION
params.check_param(errors,'seqtrim_version','String',default_value,comment)

if !errors.empty?
        $LOG.error 'Please, define the following global parameters in params file:'
        errors.each do |error|
          $LOG.error '   -' + error
        end #end each
      end #end if

return errors.empty?

end

#get_cd_hit_cmd(cd_hit_input_file, workers, init_file_path) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/seqtrimnext/classes/seqtrim.rb', line 50

def get_cd_hit_cmd(cd_hit_input_file,workers,init_file_path)
  
  num_cpus_cdhit=1
  cmd=''
  
  
  # if workers is an integer, reduce it by one in the server
begin
  Integer(workers)
  num_cpus_cdhit = workers
  cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta -M #{num_cpus_cdhit*1000} -T #{num_cpus_cdhit} > cd-hit-454.out"
   
 rescue Exception => exception #not an integer, send via ssh to other machine
    # puts exception
   worker_hash={};workers.map{|e| worker_hash[e] = (worker_hash[e]||0) +1}
   
   max_worker = worker_hash.sort_by{|k,v| -v}.first
   puts "Found these workers: #{worker_hash.sort_by{|k,v| -v}}"
   num_cpus_cdhit=max_worker[1]
   
   init=''
   cd=''


   cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta -M #{num_cpus_cdhit*1000} -T #{num_cpus_cdhit} > cd-hit-454.out"
   		    
    # worker is different to current machine, send over ssh
   if max_worker[0]!= workers[0]
      
    
       if File.exists?(init_file_path)
         init=". #{init_file_path}; "
       end

      pwd=`pwd`.chomp

      cd =''

      if File.exists?(pwd)
        cd = "cd #{pwd}; "
      end
      cmd = "ssh #{max_worker[0]} \"#{init} #{cd} #{cmd}\""
   end
  end
  
  
  
  return cmd
end

#get_custom_cdhit(cd_hit_input_file, params) ⇒ Object

First of all, reads the file’s parameters, where are the values of all parameters and the ‘plugin_list’ that specifies the order of execution from the plugins.

Secondly, loads the plugins in a folder .

Thirdly, checks if parameter’s file have the number of parameters necessary for every plugin that is going to be executed.

After that, creates a thread’s pool of a determinate number of workers, e.g. 10 threads, reads the sequences from files ‘fasta’ , until now without qualities, and executes the plugins over the sequences in the pool of threads



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/seqtrimnext/classes/seqtrim.rb', line 34

def get_custom_cdhit(cd_hit_input_file,params)
  cmd=''
  begin
    cdhit_custom_parameters=params.get_param('cdhit_custom_parameters').strip
    
    if !cdhit_custom_parameters.nil? and !cdhit_custom_parameters.empty?
      cmd = "cd-hit-454 -i #{cd_hit_input_file} -o clusters.fasta #{cdhit_custom_parameters} > cd-hit-454.out"
    end
  
  rescue Exception => exception #not an integer, send via ssh to other machine
    cmd=''
  end
  
  return cmd
end