Class: SushiFabric::SushiApp

Inherits:
Object
  • Object
show all
Defined in:
lib/sushi_fabric/sushiApp.rb

Overview

module_function :save_data_set

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSushiApp

Returns a new instance of SushiApp.



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/sushi_fabric/sushiApp.rb', line 229

def initialize
  @gstore_dir = GSTORE_DIR
  @project = nil
  @name = nil
  @params = {}
  @params['cores'] = nil
  @params['ram'] = nil
  @params['scratch'] = nil
  @params['partition'] = ''
  @params['process_mode'] = 'SAMPLE'
  @params['samples'] = ''
  @job_ids = []
  @required_columns = []
  @module_source = MODULE_SOURCE
  @modules = []
  #@workflow_manager = workflow_manager_instance||DRbObject.new_with_uri(WORKFLOW_MANAGER)
  @last_job = true
end

Instance Attribute Details

#analysis_categoryObject (readonly)

Returns the value of attribute analysis_category.



205
206
207
# File 'lib/sushi_fabric/sushiApp.rb', line 205

def analysis_category
  @analysis_category
end

#current_userObject

Returns the value of attribute current_user.



219
220
221
# File 'lib/sushi_fabric/sushiApp.rb', line 219

def current_user
  @current_user
end

#data_setObject

Returns the value of attribute data_set.



212
213
214
# File 'lib/sushi_fabric/sushiApp.rb', line 212

def data_set
  @data_set
end

#dataset_hashObject (readonly)

Returns the value of attribute dataset_hash.



204
205
206
# File 'lib/sushi_fabric/sushiApp.rb', line 204

def dataset_hash
  @dataset_hash
end

#dataset_nameObject

Returns the value of attribute dataset_name.



216
217
218
# File 'lib/sushi_fabric/sushiApp.rb', line 216

def dataset_name
  @dataset_name
end

#dataset_sushi_idObject

Returns the value of attribute dataset_sushi_id.



211
212
213
# File 'lib/sushi_fabric/sushiApp.rb', line 211

def dataset_sushi_id
  @dataset_sushi_id
end

#dataset_tsv_fileObject

Returns the value of attribute dataset_tsv_file.



209
210
211
# File 'lib/sushi_fabric/sushiApp.rb', line 209

def dataset_tsv_file
  @dataset_tsv_file
end

#descriptionObject (readonly)

Returns the value of attribute description.



206
207
208
# File 'lib/sushi_fabric/sushiApp.rb', line 206

def description
  @description
end

#employeeObject (readonly)

Returns the value of attribute employee.



226
227
228
# File 'lib/sushi_fabric/sushiApp.rb', line 226

def employee
  @employee
end

#inactivate_nodesObject (readonly)

Returns the value of attribute inactivate_nodes.



225
226
227
# File 'lib/sushi_fabric/sushiApp.rb', line 225

def inactivate_nodes
  @inactivate_nodes
end

#input_dataset_bfabric_application_numberObject

Returns the value of attribute input_dataset_bfabric_application_number.



223
224
225
# File 'lib/sushi_fabric/sushiApp.rb', line 223

def input_dataset_bfabric_application_number
  @input_dataset_bfabric_application_number
end

#job_idsObject (readonly)

Returns the value of attribute job_ids.



200
201
202
# File 'lib/sushi_fabric/sushiApp.rb', line 200

def job_ids
  @job_ids
end

#loggerObject

Returns the value of attribute logger.



220
221
222
# File 'lib/sushi_fabric/sushiApp.rb', line 220

def logger
  @logger
end

#mango_run_nameObject

Returns the value of attribute mango_run_name.



222
223
224
# File 'lib/sushi_fabric/sushiApp.rb', line 222

def mango_run_name
  @mango_run_name
end

#modulesObject (readonly)

Returns the value of attribute modules.



208
209
210
# File 'lib/sushi_fabric/sushiApp.rb', line 208

def modules
  @modules
end

#nameObject (readonly)

Returns the value of attribute name.



207
208
209
# File 'lib/sushi_fabric/sushiApp.rb', line 207

def name
  @name
end

#next_dataset_bfabric_application_numberObject

Returns the value of attribute next_dataset_bfabric_application_number.



224
225
226
# File 'lib/sushi_fabric/sushiApp.rb', line 224

def next_dataset_bfabric_application_number
  @next_dataset_bfabric_application_number
end

#next_dataset_commentObject

Returns the value of attribute next_dataset_comment.



217
218
219
# File 'lib/sushi_fabric/sushiApp.rb', line 217

def next_dataset_comment
  @next_dataset_comment
end

#next_dataset_idObject (readonly)

Returns the value of attribute next_dataset_id.



201
202
203
# File 'lib/sushi_fabric/sushiApp.rb', line 201

def next_dataset_id
  @next_dataset_id
end

#next_dataset_nameObject

Returns the value of attribute next_dataset_name.



215
216
217
# File 'lib/sushi_fabric/sushiApp.rb', line 215

def next_dataset_name
  @next_dataset_name
end

#off_bfabric_registrationObject

Returns the value of attribute off_bfabric_registration.



221
222
223
# File 'lib/sushi_fabric/sushiApp.rb', line 221

def off_bfabric_registration
  @off_bfabric_registration
end

#parameterset_tsv_fileObject

Returns the value of attribute parameterset_tsv_file.



210
211
212
# File 'lib/sushi_fabric/sushiApp.rb', line 210

def parameterset_tsv_file
  @parameterset_tsv_file
end

#paramsObject (readonly)

Returns the value of attribute params.



199
200
201
# File 'lib/sushi_fabric/sushiApp.rb', line 199

def params
  @params
end

#projectObject

Returns the value of attribute project.



213
214
215
# File 'lib/sushi_fabric/sushiApp.rb', line 213

def project
  @project
end

#queueObject

Returns the value of attribute queue.



227
228
229
# File 'lib/sushi_fabric/sushiApp.rb', line 227

def queue
  @queue
end

#required_columnsObject (readonly)

Returns the value of attribute required_columns.



202
203
204
# File 'lib/sushi_fabric/sushiApp.rb', line 202

def required_columns
  @required_columns
end

#required_paramsObject (readonly)

Returns the value of attribute required_params.



203
204
205
# File 'lib/sushi_fabric/sushiApp.rb', line 203

def required_params
  @required_params
end

#userObject

Returns the value of attribute user.



214
215
216
# File 'lib/sushi_fabric/sushiApp.rb', line 214

def user
  @user
end

#workflow_managerObject

Returns the value of attribute workflow_manager.



218
219
220
# File 'lib/sushi_fabric/sushiApp.rb', line 218

def workflow_manager
  @workflow_manager
end

Instance Method Details

#batch_modeObject



748
749
750
751
752
753
754
755
756
757
758
759
760
# File 'lib/sushi_fabric/sushiApp.rb', line 748

def batch_mode
  @job_script = if @dataset_sushi_id and dataset = DataSet.find_by_id(@dataset_sushi_id.to_i)
                  File.join(@job_script_dir, dataset.name.gsub(/\s+/,'_') + '.sh')
                else 
                  File.join(@job_script_dir, 'job_script.sh')
                end
  @dataset_hash.each do |row|
    @dataset = Hash[*row.map{|key,value| [key.gsub(/\[.+\]/,'').strip, value]}.flatten]
    make_job_script('append')
    @result_dataset << next_dataset
  end
  @job_scripts << @job_script
end

#check_application_parametersObject



342
343
344
345
346
347
348
# File 'lib/sushi_fabric/sushiApp.rb', line 342

def check_application_parameters
  if @required_params and (@required_params - @params.keys).empty?
    # PD, 20230623, the following fix changed parameters.tsv info, and reverted
    #@output_params = {"sushi_app" => self.class.name}.merge(@params.clone)
    @output_params = @params.clone
  end
end

#check_latest_modules_version(modules) ⇒ Object



403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/sushi_fabric/sushiApp.rb', line 403

def check_latest_modules_version(modules)
  command_out =  %x[ bash -lc "source #{@module_source}; module whatis #{modules.join(" ")} 2>&1" | cut -f 1 -d " " | uniq ]
  latest_modules = []
  command_out.split("\n").each do |line_|
    line = line_.chomp
    unless line.empty?
      if line =~ /#{modules.join("|")}/
        latest_modules << line
      end
    end
  end
  latest_modules
end

#check_required_columnsObject



335
336
337
338
339
340
341
# File 'lib/sushi_fabric/sushiApp.rb', line 335

def check_required_columns
  if @dataset_hash and @required_columns and (@required_columns-@dataset_hash.map{|row| row.keys}.flatten.uniq.map{|colname| colname.gsub(/\[.+\]/,'').strip}).empty?
    true
  else
    false
  end
end

#cluster_nodesObject



684
685
686
687
# File 'lib/sushi_fabric/sushiApp.rb', line 684

def cluster_nodes
  @workflow_manager||=DRbObject.new_with_uri(WORKFLOW_MANAGER)
  @workflow_manager.cluster_nodes
end

#commandsObject

 this should be overwritten in a subclass



515
516
517
# File 'lib/sushi_fabric/sushiApp.rb', line 515

def commands
  # this should be overwritten in a subclass
end

#copy_commands(org_dir, dest_parent_dir, now = nil, queue = "light") ⇒ Object



627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
# File 'lib/sushi_fabric/sushiApp.rb', line 627

def copy_commands(org_dir, dest_parent_dir, now=nil, queue="light")
  @workflow_manager||=DRbObject.new_with_uri(WORKFLOW_MANAGER)
  com = ''
  cnt_retry = 0
  begin
    com = @workflow_manager.copy_commands(org_dir, dest_parent_dir, now, queue)
  rescue => e
    time = Time.now.strftime("[%Y.%m.%d %H:%M:%S]")
    @logger.error("*"*50)
    @logger.error("copy_command error #{time}")
    @logger.error("error: #{e}")
    @logger.error("org_dir: #{org_dir}, dest_parent_dir: #{dest_parent_dir}, now: #{now}")
    @logger.error("*"*50)
    sleep 1
    cnt_retry += 1
    retry if cnt_retry < 3
  end
  com
end

#copy_inputdataset_parameter_jobscriptsObject



658
659
660
661
662
663
664
665
666
667
668
669
# File 'lib/sushi_fabric/sushiApp.rb', line 658

def copy_inputdataset_parameter_jobscripts
  org = @scratch_result_dir
  dest = @gstore_project_dir
  copy_commands(org, dest, 'now').each do |command|
    puts `which python`
    puts command
    unless system command
      raise "fails in copying input_dataset, parameters and jobscript files from /scratch to /gstore"
    end
  end
  #sleep 1
end

#copy_nextdatasetObject

sleep 1



670
671
672
673
674
675
676
677
678
679
680
681
682
683
# File 'lib/sushi_fabric/sushiApp.rb', line 670

def copy_nextdataset
  org = @next_dataset_tsv_path
  dest = File.join(@gstore_project_dir, @result_dir_base)
  copy_commands(org, dest, 'now').each do |command|
    puts `which python`
    puts command
    unless system command
      raise "fails in copying next_dataset files from /scratch to /gstore"
    end
  end
  sleep 1
  command = "rm -rf #{@scratch_result_dir}"
  `#{command}`
end

#copy_uploaded_filesObject



646
647
648
649
650
651
652
653
654
655
656
657
# File 'lib/sushi_fabric/sushiApp.rb', line 646

def copy_uploaded_files
  if not @uploaded_files.empty?
    @uploaded_files.compact.select{|file| !file.empty?}.each do |file|
      FileUtils.cp(file, @uploaded_files_dir)
      command = "cp #{file} #{@uploaded_files_dir}"
      puts command
      FileUtils.rm_r(File.dirname(file))
      command = "rm -rf #{File.dirname(file)}"
      puts command
    end
  end
end

#dataset_has_column?(colname) ⇒ Boolean

 this should be overwritten in a subclass

Returns:

  • (Boolean)


308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/sushi_fabric/sushiApp.rb', line 308

def dataset_has_column?(colname)
  flag = false
  if @dataset_hash
    @dataset_hash.map{|sample| 
      sample.each do |key, value|
        if key =~ /#{colname}/
          flag = true
        end
      end
      break
    }
  end
  flag
end

#dataset_modeObject



726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
# File 'lib/sushi_fabric/sushiApp.rb', line 726

def dataset_mode
  selected_samples = unless @params['samples'].empty?
                       Hash[*@params['samples'].split(',').map{|sample_name| [sample_name, true]}.flatten]
                     else
                       Hash[*@dataset_hash.map{|row| row['Name']}.map{|sample_name| [sample_name, true]}.flatten]
                     end
  # for a case of @dataset is used in def next_datast in SUSHIApp
  @dataset = []
  @dataset_hash.each do |row|
    if selected_samples[row['Name']]
      @dataset << row
    end
  end
  @job_script = if @dataset_sushi_id and dataset = DataSet.find_by_id(@dataset_sushi_id.to_i)
                  File.join(@job_script_dir, @analysis_category + '_' + dataset.name.gsub(/[\s+,\/]/,'_') + '.sh')
                else 
                  File.join(@job_script_dir, @analysis_category + '_' + 'job_script.sh')
                end
  make_job_script
  @job_scripts << @job_script
  @result_dataset << next_dataset
end

#default_nodeObject



688
689
690
691
# File 'lib/sushi_fabric/sushiApp.rb', line 688

def default_node
  @workflow_manager||=DRbObject.new_with_uri(WORKFLOW_MANAGER)
  @workflow_manager.default_node
end

#get_columns_with_tag(tag) ⇒ Object



299
300
301
302
303
304
# File 'lib/sushi_fabric/sushiApp.rb', line 299

def get_columns_with_tag(tag)
  #@factor_cols = @dataset_hash.first.keys.select{|header| header =~ /\[#{tag}\]/}.map{|header| header.gsub(/\[.+\]/,'').strip}
  @dataset_hash.map{|row| 
    Hash[*row.select{|k,v| k=~/\[#{tag}\]/}.map{|k,v| [k.gsub(/\[.+\]/,'').strip,v]}.flatten]
  }
end


469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
# File 'lib/sushi_fabric/sushiApp.rb', line 469

def job_footer
  @out.print "#### JOB IS DONE WE PUT THINGS IN PLACE AND CLEAN AUP\n"
  if File.exist?("/usr/local/ngseq/miniconda3/etc/profile.d/conda.sh")
    @out.print <<-EOS
. "/usr/local/ngseq/miniconda3/etc/profile.d/conda.sh"
conda activate sushi
    EOS
  end

  src_files = []
  dest_dirs = []
  greq = (copy_commands("AAA", "BBB").join =~ /g-req/)
  if @output_files
    @output_files.map{|header| next_dataset[header]}.each do |file|
      src_file = File.basename(file)
      dest_dir = File.dirname(File.join(@gstore_dir, file))
      src_files << src_file
      dest_dirs << dest_dir
    end
    if dest_dirs.uniq.length == 1 and greq
      src_file = src_files.join(" ")
      dest_dir = dest_dirs.first
      @out.print copy_commands(src_file, dest_dir, nil, @queue).join("\n"), "\n"
    else
      @output_files.map{|header| next_dataset[header]}.each do |file|
        # in actual case, to save under /srv/gstore/
        src_file = File.basename(file)
        dest_dir = File.dirname(File.join(@gstore_dir, file))
        @out.print copy_commands(src_file, dest_dir, nil, @queue).join("\n"), "\n"
      end
    end
  end
  @out.print <<-EOF
cd #{SCRATCH_DIR}
rm -rf #{@scratch_dir} || exit 1

  EOF

end

#job_headerObject



416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'lib/sushi_fabric/sushiApp.rb', line 416

def job_header
  @scratch_dir = if @params['process_mode'] == 'SAMPLE'
                   @scratch_result_dir + "_" + @dataset['Name'] + '_temp$$'
                 else
                   @scratch_result_dir + '_temp$$'
                 end
  hold_jid_option = if @dataset_sushi_id and parent_data_set = DataSet.find_by_id(@dataset_sushi_id.to_i) and !parent_data_set.jobs.empty? and parent_data_set_job_ids = parent_data_set.jobs.map{|job| job.submit_job_id} and !parent_data_set_job_ids.join.empty?
                      "#SBATCH --dependency=afterany:#{parent_data_set_job_ids.join(":")}"
                    else
                      ''
                    end
  module_src_command = if @module_source and @modules and !@modules.empty?
                     "source #{@module_source}"
                   else
                     ""
                   end
  module_add_commands = if @modules and !@modules.empty?
                          modules_with_version = check_latest_modules_version(@modules)
                          if @modules.length == modules_with_version.length
                            modules_with_version.compact!
                            "module add #{modules_with_version.join(' ')}"
                          else
                            @logger.error("#"*100)
                            @logger.error("# Error in checking modules ")
                            @logger.error("# Please check if all modules are correctly installed, searched #{@modules.join(",")} but only detected #{modules_with_version.join(",")}")
                            @logger.error("#"*100)
                            # "exit # Please check if all modules are correctly installed, searched #{@modules.join(",")} but only detected #{modules_with_version.join(",")}"
                            ""
                          end
                        else
                          ""
                        end
  @out.print <<-EOF
#!/bin/bash
#{hold_jid_option}
set -e
set -o pipefail
umask 0002

#### SET THE STAGE
SCRATCH_DIR=#{@scratch_dir}
GSTORE_DIR=#{@gstore_dir}
INPUT_DATASET=#{@input_dataset_tsv_path}
LAST_JOB=#{@last_job.to_s.upcase}
echo "Job runs on `hostname`"
echo "at $SCRATCH_DIR"
mkdir $SCRATCH_DIR || exit 1
cd $SCRATCH_DIR || exit 1
#{module_src_command}
#{module_add_commands}

  EOF
end

#job_mainObject



508
509
510
511
# File 'lib/sushi_fabric/sushiApp.rb', line 508

def job_main
  @out.print "#### NOW THE ACTUAL JOBS STARTS\n"
  @out.print commands, "\n\n"
end

#main(mock = false) ⇒ Object



818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
# File 'lib/sushi_fabric/sushiApp.rb', line 818

def main(mock=false)
  ## sushi writes creates the job scripts and builds the result data set that is to be generated
  @result_dataset = []
  @job_scripts = []
  if @params['process_mode'] == 'SAMPLE'
    sample_mode
  elsif @params['process_mode'] == 'DATASET'
    dataset_mode
  elsif @params['process_mode'] == 'BATCH'
    batch_mode
  else 
    #stop
    warn "the process mode (#{@params['process_mode']}) is not defined"
    raise "stop job submitting"
  end
  if mock
    make_dummy_files
  end

  print 'result dataset: '
  p @result_dataset

  # copy application data to gstore 
  @next_dataset_tsv_path = save_next_dataset_as_tsv

  if @dataset_sushi_id and dataset = DataSet.find_by_id(@dataset_sushi_id.to_i)
    data_set_arr = []
    headers = []
    rows = []
    next_dataset_name = if name = @next_dataset_name
                          name.to_s
                        else
                          "#{@name.gsub(/\s/,'').gsub(/_/,'')}_#{dataset.id}"
                        end
    data_set_arr = if @params['next_dataset_root']
                     {'DataSetName'=>next_dataset_name, 'ProjectNumber'=>@project.gsub(/p/,''), 'Comment'=>@next_dataset_comment.to_s}
                   else
                     {'DataSetName'=>next_dataset_name, 'ProjectNumber'=>@project.gsub(/p/,''), 'ParentID'=>@dataset_sushi_id, 'Comment'=>@next_dataset_comment.to_s}
                   end

    csv = CSV.readlines(@next_dataset_tsv_path, :col_sep=>"\t")
    csv.each do |row|
      if headers.empty?
        headers = row
      else
        rows << row
      end
    end
    unless NO_ROR
      @current_user ||= nil
      @next_dataset_id = save_data_set(data_set_arr.to_a.flatten, headers, rows, @current_user, @child)
      save_parameters_in_sushi_db
    end
  end
  copy_uploaded_files
  copy_inputdataset_parameter_jobscripts

  # job submittion
  gstore_job_script_paths = []
  @job_scripts.each_with_index do |job_script, i|
    if job_id = submit(job_script, mock)
      @job_ids << job_id
      print "Submit job #{File.basename(job_script)} job_id=#{job_id}"
      gstore_job_script_paths << File.join(@gstore_script_dir, File.basename(job_script))
    end
  end

  puts
  print 'job scripts: '
  p @job_scripts


  unless @job_ids.empty? or NO_ROR
    # save job and dataset relation in Sushi DB
    job_ids.each_with_index do |job_id, i|
      new_job = Job.new
      new_job.submit_job_id = job_id.to_i
      new_job.script_path = gstore_job_script_paths[i]
      new_job.next_dataset_id = @next_dataset_id
      new_job.save
      new_job.data_set.jobs << new_job
      new_job.data_set.save
    end
  end
  copy_nextdataset
end

#make_dummy_filesObject



916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
# File 'lib/sushi_fabric/sushiApp.rb', line 916

def make_dummy_files
  dummy_files_header = []
  headers = @result_dataset.map{|row| row.keys}.flatten.uniq
  headers.select{|header| header.tag?('File')||header.tag?('Link')}.each do |header|
    dummy_files_header << header
  end
  dummy_files_ = []
  @result_dataset.each do |row|
    dummy_files_.concat(dummy_files_header.map{|header| row[header]})
  end
  dummy_files = []
  dummy_files_.each do |file|
    dummy_files << file.gsub(@result_dir, '')
  end
  dummy_files.uniq!

  dirs = []
  dummy_files.permutation(2).each do |a,b|
    if a.include?(b) and b !~ /\./
      dirs << b
    end
  end
  dirs.each do |dir|
    dummy_files.delete(dir)
  end
  dirs.each do |dir|
    command = "mkdir -p #{File.join(@scratch_result_dir, dir)}"
    puts command
    `#{command}`
  end
  dummy_files.each do |file|
    command = if file =~ /.html/
                "echo 'Hello, SUSHI world!' > #{File.join(@scratch_result_dir, file)}"
              else
                "touch #{File.join(@scratch_result_dir, file)}"
              end
    puts command
    `#{command}`
  end
end

#make_job_script(append = false) ⇒ Object



693
694
695
696
697
698
699
700
701
702
703
# File 'lib/sushi_fabric/sushiApp.rb', line 693

def make_job_script(append = false)
  @out = if append
           open(@job_script, 'a')
         else
           open(@job_script, 'w')
         end
  job_header
  job_main
  job_footer
  @out.close
end

#mock_runObject



956
957
958
959
960
961
962
# File 'lib/sushi_fabric/sushiApp.rb', line 956

def mock_run
  test_run
  prepare_result_dir
  save_parameters_as_tsv
  save_input_dataset_as_tsv
  main(true)
end

#next_datasetObject



512
513
514
# File 'lib/sushi_fabric/sushiApp.rb', line 512

def next_dataset
  # this should be overwritten in a subclass
end

#prepare_result_dirObject



392
393
394
395
396
397
398
399
400
401
402
# File 'lib/sushi_fabric/sushiApp.rb', line 392

def prepare_result_dir
  FileUtils.mkdir_p(@scratch_result_dir)
  FileUtils.mkdir_p(@job_script_dir)
  @uploaded_files = []
  @params.each do |key, value|
    if @params[key, 'file_upload']
      FileUtils.mkdir_p(@uploaded_files_dir)
      @uploaded_files << value
    end
  end
end

#preprocessObject



571
572
573
# File 'lib/sushi_fabric/sushiApp.rb', line 571

def preprocess
  # this should be overwritten in a subclass
end

#runObject



904
905
906
907
908
909
910
911
912
913
914
915
# File 'lib/sushi_fabric/sushiApp.rb', line 904

def run
  test_run

  ## the user presses RUN
  prepare_result_dir

  ## copy application data to gstore 
  save_parameters_as_tsv
  save_input_dataset_as_tsv

  main
end

#sample_modeObject



704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
# File 'lib/sushi_fabric/sushiApp.rb', line 704

def sample_mode
  selected_samples = unless @params['samples'].empty?
                       Hash[*@params['samples'].split(',').map{|sample_name| [sample_name, true]}.flatten]
                     else
                       Hash[*@dataset_hash.map{|row| row['Name']}.map{|sample_name| [sample_name, true]}.flatten]
                     end
  @dataset_hash.each_with_index do |row, i|
    @dataset = Hash[*row.map{|key,value| [key.gsub(/\[.+\]/,'').strip, value]}.flatten]
    if selected_samples[@dataset['Name']]
      sample_name = @dataset['Name']||@dataset.first
      @job_script = if @dataset_sushi_id and dataset = DataSet.find_by_id(@dataset_sushi_id.to_i)
                      File.join(@job_script_dir, @analysis_category + '_' + sample_name) + '_' + dataset.name.gsub(/\s+/,'_') + '.sh'
                    else
                      File.join(@job_script_dir, @analysis_category + '_' + sample_name) + '.sh'
                    end
      @last_job = (i == @dataset_hash.length - 1)
      make_job_script
      @job_scripts << @job_script
      @result_dataset << next_dataset
    end
  end
end

#save_data_set(data_set_arr, headers, rows, user = nil, child = nil) ⇒ Object



761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
# File 'lib/sushi_fabric/sushiApp.rb', line 761

def save_data_set(data_set_arr, headers, rows, user=nil, child=nil)
  data_set_hash = Hash[*data_set_arr]
  unless project = Project.find_by_number(data_set_hash['ProjectNumber'].to_i)
    project = Project.new
    project.number = data_set_hash['ProjectNumber'].to_i
    project.save
  end
  if project = Project.find_by_number(data_set_hash['ProjectNumber'].to_i)
    data_set = DataSet.new
    if user
      data_set.user = user
    end
    data_set.name = data_set_hash['DataSetName']
    data_set.project = project
    if parent_id = data_set_hash['ParentID'] and parent_data_set = DataSet.find_by_id(parent_id.to_i)
      data_set.data_set = parent_data_set
      data_set.sushi_app_name = self.class.name
    end
    if comment = data_set_hash['Comment'] and !comment.to_s.empty?
      data_set.comment = comment
    end
    if @mango_run_name
      data_set.run_name_order_id = @mango_run_name
    end

    sample_hash = {}
    rows.each do |row|
      headers.each_with_index do |header, i|
       sample_hash[header]=row[i]
      end
      sample = Sample.new
      sample.key_value = sample_hash.to_s
      sample.save # skip exact-match search
      data_set.samples << sample
    end

    if child
      data_set.child = true
    end

    data_set.md5 = data_set.md5hexdigest
    project.data_sets << data_set
    parent_data_set.data_sets << data_set if parent_data_set
    data_set.save
    if user
      user.data_sets << data_set
      user.save
    end
    data_set.id
  end
end

#save_input_dataset_as_tsvObject



599
600
601
602
603
604
605
606
607
608
609
610
611
612
# File 'lib/sushi_fabric/sushiApp.rb', line 599

def save_input_dataset_as_tsv
  file_path = File.join(@scratch_result_dir, @input_dataset_file)
  CSV.open(file_path, 'w', :col_sep=>"\t") do |out|
    headers = @dataset_hash.map{|row| row.keys}.flatten.uniq
    out << headers
    @dataset_hash.each do |row|
      out << headers.map{|header| 
        val = row[header]
        val.to_s.empty? ? nil:val
      }
    end
  end
  file_path
end

#save_next_dataset_as_tsvObject



613
614
615
616
617
618
619
620
621
622
623
624
625
626
# File 'lib/sushi_fabric/sushiApp.rb', line 613

def save_next_dataset_as_tsv
  headers = @result_dataset.map{|row| row.keys}.flatten.uniq
  file_path = File.join(@scratch_result_dir, @next_dataset_file)
  CSV.open(file_path, 'w', :col_sep=>"\t") do |out|
    out << headers
    @result_dataset.each do |row_hash|
      out << headers.map{|header| 
        val = row_hash[header]
        val.to_s.empty? ? nil:val
      }
    end
  end
  file_path
end

#save_parameters_as_tsvObject



582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
# File 'lib/sushi_fabric/sushiApp.rb', line 582

def save_parameters_as_tsv
  file_path = File.join(@scratch_result_dir, @parameter_file)
  CSV.open(file_path, 'w', :col_sep=>"\t") do |out|
    out << ["sushi_app", self.class.name]
    @output_params.each do |key, value|
      if @output_params[key, 'file_upload'] and !value.to_s.empty?
        uploaded_file_path = File.join(@result_dir, "uploaded", File.basename(value))
        out << [key, uploaded_file_path]
        @params[key] = uploaded_file_path
        @output_params[key] = uploaded_file_path
      else
        out << [key, value]
      end
    end
  end
  file_path
end

#save_parameters_in_sushi_dbObject



812
813
814
815
816
817
# File 'lib/sushi_fabric/sushiApp.rb', line 812

def save_parameters_in_sushi_db
  if @next_dataset_id and next_dataset = DataSet.find_by_id(@next_dataset_id)
    next_dataset.job_parameters = @output_params
    next_dataset.save
  end
end

#set_default_parametersObject



305
306
307
# File 'lib/sushi_fabric/sushiApp.rb', line 305

def set_default_parameters
  # this should be overwritten in a subclass
end

#set_dir_pathsObject



371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/sushi_fabric/sushiApp.rb', line 371

def set_dir_paths
  ## sushi figures out where to put the resulting dataset
  unless @name and @project
    raise "should set #name and #project"
  end
  @name.gsub!(/\s/,'_')
  @result_dir_base = if @next_dataset_name
                      [@next_dataset_name, Time.now.strftime("%Y-%m-%d--%H-%M-%S")].join("_")
                    else
                      [@name, @dataset_sushi_id.to_s, Time.now.strftime("%Y-%m-%d--%H-%M-%S")].join("_")
                    end
  @result_dir = File.join(@project, @result_dir_base)
  @scratch_result_dir = File.join(SCRATCH_DIR, @result_dir_base)
  @job_script_dir = File.join(@scratch_result_dir, 'scripts')
  @uploaded_files_dir = File.join(@scratch_result_dir, 'uploaded')
  @gstore_result_dir = File.join(@gstore_dir, @result_dir)
  @gstore_script_dir = File.join(@gstore_result_dir, 'scripts')
  @gstore_project_dir = File.join(@gstore_dir, @project)
  @gstore_uploaded_dir = File.join(@gstore_result_dir, 'uploaded')
  set_file_paths
end

#set_file_pathsObject

 this should be overwritten in a subclass



574
575
576
577
578
579
580
581
# File 'lib/sushi_fabric/sushiApp.rb', line 574

def set_file_paths
  @parameter_file = 'parameters.tsv'
  @input_dataset_file = 'input_dataset.tsv'
  @next_dataset_file = 'dataset.tsv'
  @input_dataset_tsv_path = File.join(@gstore_result_dir, @input_dataset_file)
  @parameters_tsv_path = File.join(@gstore_result_dir, @parameter_file)
  @next_dataset_tsv_path = File.join(@gstore_result_dir, @next_dataset_file)
end

#set_input_datasetObject



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/sushi_fabric/sushiApp.rb', line 247

def set_input_dataset
  if @dataset_tsv_file
    dataset_tsv = CSV.readlines(@dataset_tsv_file, :headers=>true, :col_sep=>"\t")
    @dataset_hash = []
    @dataset = []
    dataset_tsv.each do |row|
      @dataset_hash << row.to_hash
      @dataset << row.to_hash
    end

    # save in sushi db unless it is saved in sushi db
    data_set_arr = []
    headers = []
    rows = []
    dataset_name = if @dataset_name
                     @dataset_name
                   else
                     File.basename(@dataset_tsv_file).gsub(/.tsv/, '')
                   end
    data_set_arr = {'DataSetName'=>dataset_name, 'ProjectNumber'=>@project.gsub(/p/,'')}
    csv = CSV.readlines(@dataset_tsv_file, :col_sep=>"\t")
    csv.each do |row|
      if headers.empty?
        headers = row
      else
        rows << row
      end
    end
    unless NO_ROR
      @current_user ||= nil
      if @dataset_sushi_id = save_data_set(data_set_arr.to_a.flatten, headers, rows, @current_user)
        unless @off_bfabric_registration
          if dataset = DataSet.find_by_id(@dataset_sushi_id.to_i)
            dataset.register_bfabric(bfabric_application_number: @input_dataset_bfabric_application_number)
          end
        end
      elsif data_set = headers[0] and data_set.instance_of?(DataSet)
        @dataset_sushi_id = data_set.id
      end
    end
  elsif @dataset_sushi_id
    @dataset_hash = []
    @dataset = []
    if dataset = DataSet.find_by_id(@dataset_sushi_id.to_i)
      dataset.samples.each do |sample|
        @dataset_hash << sample.to_hash
        @dataset << sample.to_hash
      end
    end
  end
  @dataset_hash
end

#set_output_filesObject



323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/sushi_fabric/sushiApp.rb', line 323

def set_output_files
  if @params['process_mode'] == 'SAMPLE'
    @dataset = {}
  end
  next_dataset.keys.select{|header| header.tag?('File')}.each do |header|
    @output_files ||= []
    @output_files << header
  end
  if @output_files
    @output_files = @output_files.uniq
  end
end

#set_user_parametersObject



349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/sushi_fabric/sushiApp.rb', line 349

def set_user_parameters
  # this should be done in an instance of applicaiton subclass
  if @parameterset_tsv_file
    parameterset_tsv = CSV.readlines(@parameterset_tsv_file, :col_sep=>"\t")
    headers = []
    parameterset_tsv.each do |row|
      header, value = row
      headers << header
      @params[header] = if @params.data_type(header) == String or value == nil
                          value
                        else
                          eval(value)
                        end
    end
    (@params.keys - headers).each do |key|
      unless @params[key]
        @params[key] = @params.default_value(key)
      end
    end
  end
  @params
end

#submit(job_script, mock = false) ⇒ Object



546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
# File 'lib/sushi_fabric/sushiApp.rb', line 546

def submit(job_script, mock=false)
  begin
    job_id = unless mock
               i = submit_command(job_script)
               i.to_i
             else
               #Time.now.to_f.to_s.gsub('.', '')
               1234
             end
    unless job_id.to_i > 1
      @logger.error("#"*50)
      time = Time.now.strftime("[%Y.%m.%d %H:%M:%S]")
      @logger.error("error happened in job submitting, but maybe fine. #{time}")
      @logger.error("#"*50)
      job_id = nil
    end
  rescue
    @logger.error("@"*50)
    time = Time.now.strftime("[%Y.%m.%d %H:%M:%S]")
    @logger.error("error happened in job submitting, but maybe fine. #{time}")
    @logger.error("@"*50)
    job_id = nil
  end
  job_id
end

#submit_command(job_script) ⇒ Object

 this should be overwritten in a subclass



518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
# File 'lib/sushi_fabric/sushiApp.rb', line 518

def submit_command(job_script)
  gsub_options = []
  gsub_options << "-c #{@params['cores']}" unless @params['cores'].to_s.empty?
  gsub_options << "-n #{@params['node']}" unless @params['node'].to_s.empty?
  gsub_options << "-p #{@params['partition']}" unless @params['partition'].to_s.empty?
  gsub_options << "-r #{@params['ram']}" unless @params['ram'].to_s.empty?
  gsub_options << "-s #{@params['scratch']}" unless @params['scratch'].to_s.empty?
  gsub_options << "-i #{@params['nice']}" unless @params['nice'].to_s.empty?
  command = "wfm_monitoring --server #{WORKFLOW_MANAGER} --user #{@user} --project #{@project.gsub(/p/,'')} --logdir #{@gstore_script_dir} #{job_script} #{gsub_options.join(' ')}"
  puts "submit: #{command}"

  project_number = @project.gsub(/p/, '')
  @workflow_manager||=DRbObject.new_with_uri(WORKFLOW_MANAGER)
  script_content = File.read(job_script)
  job_id = 0
  begin
    #job_id = @workflow_manager.start_monitoring(job_script, @user, 0, script_content, project_number, gsub_options.join(' '), @gstore_script_dir)
    job_id = @workflow_manager.start_monitoring3(job_script, script_content, @user, project_number, gsub_options.join(' '), @gstore_script_dir, @next_dataset_id, RAILS_HOST)
  rescue => e
    time = Time.now.strftime("[%Y.%m.%d %H:%M:%S]")
    @logger.error("*"*50)
    @logger.error("submit_command error #{time}")
    @logger.error("error: #{e}")
    @logger.error("job_script: #{job_script}, @user: #{@user}, script_content: #{script_content.class} #{script_content.to_s.length} chrs, project_number: #{project_number}, gsub_options: #{gsub_options}, job_id: #{job_id}")
    @logger.error("*"*50)
  end
  job_id
end

#test_runObject



963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
# File 'lib/sushi_fabric/sushiApp.rb', line 963

def test_run
  set_input_dataset
  set_dir_paths
  preprocess
  set_output_files
  set_user_parameters

  failures = 0
  err_msgs = []
  print 'check project name: '
  unless @project
    err_msg = []
    err_msg << "\e[31mFAILURE\e[0m: project number is required but not found. you should set it in usecase."
    err_msg << "\tex.)"
    err_msg << "\tapp = #{self.class}.new"
    err_msg << "\tapp.project = 'p1001'"
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
    failures += 1
  else
    puts "\e[32mPASSED\e[0m:\n\t@project=#{@project}"
  end

  print 'check user name: '
  unless @user
    err_msg = []
    err_msg << "\e[31mWARNING\e[0m: user number is ought to be added but not found. you should set it in usecase. Default will be 'sushi lover'"
    err_msg << "\tex.)"
    err_msg << "\tapp = #{self.class}.new"
    err_msg << "\tapp.user = 'masa'"
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
  else
    puts "\e[32mPASSED\e[0m:\n\t@user=#{@user}"
  end

  print 'check application name: '
  if @name.to_s.empty?
    err_msg = []
    err_msg << "\e[31mFAILURE\e[0m: application name is required but not found. you should set it in application class."
    err_msg << "\tex.)"
    err_msg << "\tclass #{self.class}"
    err_msg << "\t def initialize"
    err_msg << "\t  @name = '#{self.class}'"
    err_msg << "\t end"
    err_msg << "\tend"
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
    failures += 1
  else
    puts "\e[32mPASSED\e[0m:\n\t@name=#{@name}"
  end

  print 'check analysis_category: '
  if @analysis_category.to_s.empty?
    err_msg = []
    err_msg << "\e[31mFAILURE\e[0m: analysis_category is required but not found. you should set it in application class."
    err_msg << "\tex.)"
    err_msg << "\tclass #{self.class}"
    err_msg << "\t def initialize"
    err_msg << "\t  @analysis_category = 'Mapping'"
    err_msg << "\t end"
    err_msg << "\tend"
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
    failures += 1
  else
    puts "\e[32mPASSED\e[0m:\n\t@analysis_category=#{@analysis_category}"
  end

  print 'check dataset: '
  if !@dataset_hash or @dataset_hash.empty?
    err_msg = []
    err_msg << "\e[31mFAILURE\e[0m: dataset is not found. you should set it by using #{self.class}#dataset_sushi_id or #{self.class}#dataset_tsv_file properties"
    err_msg << "\tex.)"
    err_msg << "\tusecase = #{self.class}.new"
    err_msg << "\tusecase.dataset_tsv_file = \"dataset.tsv\""
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
    failures += 1
  else
    puts "\e[32mPASSED\e[0m:\n\t@dataset_hash.length = #{@dataset_hash.length}"
  end

  print 'check required columns: '
  unless check_required_columns
    err_msg = []
    err_msg << "\e[31mFAILURE\e[0m: required_column(s) is not found in dataset. you should set it in application class."
    err_msg << "\tex.)"
    err_msg << "\tdef initialize"
    err_msg << "\t  @required_columns = ['Name', 'Read1']"
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
    failures += 1
  else
    puts "\e[32mPASSED\e[0m:"
  end
  puts "\trequired columns: #{@required_columns}"
  puts "\tdataset  columns: #{@dataset_hash.map{|row| row.keys}.flatten.uniq}" if @dataset_hash

  print 'check required parameters: '
  unless check_application_parameters
    err_msg = []
    err_msg << "\e[31mFAILURE\e[0m: required_param(s) is not set yet. you should set it in usecase"
    err_msg << "\tmissing params: #{@required_params-@params.keys}" if @required_params
    err_msg << "\tex.)"
    err_msg << "\tusecase = #{self.class}.new"
    if @required_params
      err_msg << "\tusecase.params['#{(@required_params-@params.keys)[0]}'] = parameter"
    else
      err_msg << "\tusecase.params['parameter name'] = default_parameter"
    end
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
    failures += 1
  else
    puts "\e[32mPASSED\e[0m:"
  end
  puts "\tparameters: #{@params.keys}"
  puts "\trequired  : #{@required_params}"

  print 'check next dataset: '
  if @params['process_mode'] == 'SAMPLE'
    @dataset={}
  end
  unless self.next_dataset
    err_msg = []
    err_msg << "\e[31mFAILURE\e[0m: next dataset is not set yet. you should overwrite SushiApp#next_dataset method in #{self.class}"
    err_msg << "\tnote: the return value should be Hash (key: column title, value: value in a tsv table)"
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
    failures += 1
  else
    puts "\e[32mPASSED\e[0m:"
  end

  print 'check output files: '
  if !@output_files or @output_files.empty?
    err_msg = []
    err_msg << "\e[31mWARNING\e[0m: no output files. you will not get any output files after the job running. you can set @output_files (array) in #{self.class}"
    err_msg << "\tnote: usually it should be define in initialize method"
    err_msg << "\t      the elements of @output_files should be chosen from #{self.class}#next_dataset.keys"
    err_msg << "\t      #{self.class}#next_dataset.keys: #{self.next_dataset.keys}" if self.next_dataset
    puts err_msg.join("\n")
    err_msgs.concat(err_msg)
  else
    puts "\e[32mPASSED\e[0m:"
  end

  print 'check commands: '
  if @params['process_mode'] == 'SAMPLE'
    @dataset_hash.each do |row|
      @dataset = Hash[*row.map{|key,value| [key.gsub(/\[.+\]/,'').strip, value]}.flatten]
      unless com = commands
        err_msg = []
        err_msg << "\e[31mFAILURE\e[0m: any commands is not defined yet. you should overwrite SushiApp#commands method in #{self.class}"
        err_msg << "\tnote: the return value should be String (this will be in the main body of submitted job script)"
        puts err_msg.join("\n")
        err_msgs.concat(err_msg)
        failures += 1
      else
        puts "\e[32mPASSED\e[0m:"
        puts "generated command will be:"
        puts "\t"+com.split(/\n/).join("\n\t")+"\n"
      end
    end
  elsif @params['process_mode'] == 'DATASET'
    unless com = commands
      err_msg = []
      err_msg << "\e[31mFAILURE\e[0m: any commands is not defined yet. you should overwrite SushiApp#commands method in #{self.class}"
      err_msg << "\tnote: the return value should be String (this will be in the main body of submitted job script)"
      puts err_msg.join("\n")
      err_msgs.concat(err_msg)
      failures += 1
    else
      puts "\e[32mPASSED\e[0m:"
      puts "generated command will be:"
      puts "\t"+com.split(/\n/).join("\n\t")+"\n"
    end
  end

  print 'check workflow manager: '
  begin
    @workflow_manager||=DRbObject.new_with_uri(WORKFLOW_MANAGER)
    hello = @workflow_manager.hello
  rescue
  end
  unless hello =~ /hello/
    err_msg = "\e[31mFAILURE\e[0m: workflow_manager does not reply. check if workflow_manager is working"
    puts err_msg
    err_msgs.concat([err_msg])
    failures += 1
  else
    puts "\e[32mPASSED\e[0m: #{WORKFLOW_MANAGER}"
  end

  if failures > 0
    puts
    err_msg = "\e[31mFailures (#{failures})\e[0m: All failures should be solved"
    puts err_msg
    err_msgs.unshift(err_msg)
    raise "\n"+err_msgs.join("\n")+"\n\n"
  else
    puts "All checks \e[32mPASSED\e[0m"
  end
end