Class: SeqtrimWorker

Inherits:
ScbiMapreduce::Worker
  • Object
show all
Defined in:
lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb

Instance Method Summary collapse

Instance Method Details

#add_output_data(obj) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 104

def add_output_data(obj)
   obj.output_text=[]
   
 obj.each do |seq|
    obj.output_text << seq.to_text
       write_seq_to_files(obj.output_files,seq, obj.stats)
 end
 
   # @remove seqs since they are not needed anymore to write output files
   obj.remove_all_seqs
end

#add_stat(stats, key, subkey, value, count = 1) ⇒ Object



116
117
118
119
120
121
122
123
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 116

def add_stat(stats,key,subkey,value,count=1)
 
 stats[key]={} if !stats[key]
  stats[key][subkey]={} if !stats[key][subkey]
  stats[key][subkey][value]=0 if !stats[key][subkey][value]
  
 stats[key][subkey][value]+=count
end

#closing_workerObject



99
100
101
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 99

def closing_worker
   
end

#get_file(files, fn) ⇒ Object



288
289
290
291
292
293
294
295
296
297
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 288

def get_file(files,fn)
 res=files[fn]
 
 if !res
   files[fn]=[]
   res=files[fn]
  end
  
  return res
end

#json_file(files) ⇒ Object

ACCESS TO FILES



259
260
261
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 259

def json_file(files)
   return get_file(files,File.join(OUTPUT_PATH,'results.json'))
end

#low_complexity_file(files, dir_name, file_name) ⇒ Object



276
277
278
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 276

def low_complexity_file(files, dir_name, file_name)
  return get_file(files,File.join(OUTPUT_PATH,dir_name,'low_complexity_'+file_name+'.fastq'))
end

#low_sffinfo_file(files, dir_name, file_name) ⇒ Object



284
285
286
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 284

def low_sffinfo_file(files, dir_name, file_name)
  return get_file(files,File.join(OUTPUT_PATH,dir_name,'low_complexity_sff_info_'+file_name+'.txt'))
end

#paired_file(files, dir_name, file_name) ⇒ Object



272
273
274
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 272

def paired_file(files, dir_name, file_name)
  return get_file(files,File.join(OUTPUT_PATH,dir_name,'paired_'+file_name+'.fastq'))
end

#process_object(obj) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 56

def process_object(obj)
        running_seqs=SequenceGroup.new(obj)
        
       # execute plugins
       @plugin_manager.execute_plugins(running_seqs)
       
       # add output data
        add_output_data(running_seqs)
       
     return running_seqs
end

#receive_initial_config(obj) ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 68

def receive_initial_config(obj)

# Reads the parameters
$WORKER_LOG.info "Params received"
#       @params = Params.new(params_path)
@params = obj

@use_qual=@params.get_param('use_qual')
@use_json=@params.get_param('use_json')
end

#rejected_output_file(files) ⇒ Object



263
264
265
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 263

def rejected_output_file(files)
   return get_file(files,File.join(OUTPUT_PATH,'rejected.txt'))
end

#sequence_file(files, dir_name, file_name) ⇒ Object



268
269
270
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 268

def sequence_file(files, dir_name, file_name)
   return get_file(files,File.join(OUTPUT_PATH,dir_name,'sequences_'+file_name+'.fastq'))
end

#sffinfo_file(files, dir_name, file_name) ⇒ Object



280
281
282
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 280

def sffinfo_file(files, dir_name, file_name)
  return get_file(files,File.join(OUTPUT_PATH,dir_name,'sff_info_'+file_name+'.txt'))
end

#starting_workerObject



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 79

def starting_worker

      # $WORKER_LOG.level = Logger::ERROR
      $WORKER_LOG.level = Logger::WARN
    $WORKER_LOG.info "Loading actions"

    @action_manager = ActionManager.new

    $WORKER_LOG.info "Loading plugins"
    @plugin_list = @params.get_param('plugin_list') # puts in plugin_list the plugins's array
    $WORKER_LOG.info "PLUGIN LIST:" + @plugin_list
    
    @plugin_manager = PluginManager.new(@plugin_list,@params) # creates an instance from PluginManager. This must storage the plugins and load it    

rescue Exception => e
  puts (e.message+ e.backtrace.join("\n"))

end

#write_seq_to_files(files, seq, stats) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/seqtrimnext/classes/em_classes/seqtrim_worker.rb', line 125

def write_seq_to_files(files,seq, stats)
   # puts stats.to_json
   
   dir_name,file_name=seq.get_file_tag_path
   # puts File.join(dir_name,'sequences_'+file_name)
   
   # get current inserts
   inserts = seq.get_inserts
   
   # qualities are optional
   if @use_qual
     qual_inserts = seq.get_qual_inserts
   end
   
   # save json if necessary
   if @use_json
     json_file(files)<< seq.to_json
   end
   
   # find mids
   mid = seq.get_actions(ActionMid).first
   
   if (seq.seq_rejected)           # sequence rejected
   
    #save to rejected sequences 
     message = seq.seq_rejected_by_message 
     rejected_output_file(files)<<('>'+seq.seq_name+ ' ' + message)
     
     add_stat(stats,'sequences','rejected',seq.seq_rejected_by_message)
     add_stat(stats,'sequences','count','rejected')
    

   elsif (inserts.empty?)  #sequence with no inserts
     message = 'No valid inserts found'
     rejected_output_file(files)<<('>'+seq.seq_name+ ' ' + message)
     
     add_stat(stats,'sequences','rejected',message)
     add_stat(stats,'sequences','count','rejected')
     
elsif (inserts.count == 2) # sequence with two inserts  = PAIRED SEQUENCES
     add_stat(stats,'sequences','count','output_seqs_paired')

     # TODO - Add this stats to full stats
     # @@full_stats.add_stats({'sequences' => {'paired' => {'count' => 1}}})
    
  if (mid.nil? || (mid.message=='no_MID') ) # without mid
    mid_id = 'no_MID'
    mid_message = ' No MID found'
  else
    mid_id = mid.tag_id
    mid_message=''
    if mid_id != mid_message
      mid_message = ' '+mid.message
    end
  end

     # fasta_file = get_paired_file(mid_id)

     n="#{seq.seq_name}_left"
     c="template=#{seq.seq_name} dir=R library=#{mid_id}"
     f=inserts[0].reverse.tr('actgACTG','tgacTGAC')
     q=[]
     if @use_qual
         q=qual_inserts[0].reverse          
      end   

     paired_file(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c)
     
     
     n="#{seq.seq_name}_right"
     c="template=#{seq.seq_name} dir=F library=#{mid_id}"
     f=inserts[1]
     q=[]
     if @use_qual
        q=qual_inserts[1]
      end
     
     paired_file(files,dir_name,file_name)<<FastqFile.to_fastq(n,f,q,c)
     
     
   elsif (inserts.count == 1) # sequence with one insert

  if (mid.nil? || (mid.message=='no_MID') ) # without mid
    mid_id = 'no_MID'
    mid_message = ' No MID found'
  else
    mid_id = mid.tag_id
    mid_message=''
    if mid_id != mid_message
      mid_message = ' '+mid.message
    end
  end

    # save fasta and qual in no MID file
     has_low_complexity = seq.get_actions(ActionLowComplexity)
     
     if has_low_complexity.empty?
       add_stat(stats,'sequences','count','output_seqs')
       
       # fasta_file = get_sequence_file(mid_id)
       # sff_file=get_sffinfo_file(mid_id)
       fasta_file=sequence_file(files,dir_name,file_name)
       sff_file=sffinfo_file(files,dir_name,file_name)
     else
       add_stat(stats,'sequences','count','output_seqs_low_complexity')
       
       # fasta_file = get_low_complexity_file(mid_id)
       # sff_file=get_low_sffinfo_file(mid_id)
       fasta_file=low_complexity_file(files,dir_name,file_name)
       sff_file=low_sffinfo_file(files,dir_name,file_name)
     end
     
     q=[]
     if @use_qual
       q=qual_inserts[0]
  end
     
     n=seq.seq_name
     c=mid_message
     f=inserts[0]
     
     fasta_file << FastqFile.to_fastq(n,f,q,c)
     
     inserts_pos = seq.get_actions(ActionInsert)
     
     sff_file<< "#{n} #{inserts_pos[0].start_pos+1} #{inserts_pos[0].end_pos+1}"
     
   end
  
end