Class: Bio::MAF::Parser

Inherits:
Object
  • Object
show all
Includes:
MAFParsing
Defined in:
lib/bio/maf/parser.rb

Overview

MAF parser, used for sequential and random-access parsing.

Options:

  • :parse_extended: whether to parse 'i' and 'q' lines
  • :parse_empty: whether to parse 'e' lines
  • :remove_gaps: remove gaps left after filtering sequences
  • :chunk_size: read MAF file in chunks of this many bytes
  • :random_chunk_size: as above, but for random access (#fetch_blocks)
  • :merge_max: merge up to this many bytes of blocks for random access
  • :chunk_reader: use the specified class to read chunks. (Only useful with ThreadedChunkReader).
  • :threads: number of threads to use for parallel parsing. Only useful under JRuby.

Constant Summary collapse

SEQ_CHUNK_SIZE =
131072
RANDOM_CHUNK_SIZE =
4096
MERGE_MAX =
SEQ_CHUNK_SIZE
WRAP_OPTS =
[:as_bio_alignment, :join_blocks, :remove_gaps]

Constants included from MAFParsing

MAFParsing::BLOCK_START, MAFParsing::BLOCK_START_OR_EOS, MAFParsing::COMMENT, MAFParsing::E, MAFParsing::EOL_OR_EOF, MAFParsing::I, MAFParsing::Q, MAFParsing::S, MAFParsing::STRAND_SYM

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MAFParsing

#_parse_block, #gather_leading_fragment, #parse_block_data, #parse_empty_line, #parse_error, #parse_maf_vars, #parse_seq_line, #parse_trailing_fragment, #seq_filter_ok?, #set_last_block_pos!, #trailing_nl?

Constructor Details

#initialize(file_spec, opts = {}) ⇒ Parser

Create a new parser instance.

Parameters:

  • file_spec (String)

    path of file to parse.

  • opts (Hash) (defaults to: {})

    parser options.



506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
# File 'lib/bio/maf/parser.rb', line 506

def initialize(file_spec, opts={})
  @opts = opts
  if RUBY_PLATFORM == 'java'
    opts[:threads] ||= java.lang.Runtime.runtime.availableProcessors
  end
  chunk_size = opts[:chunk_size] || SEQ_CHUNK_SIZE
  @random_access_chunk_size = opts[:random_chunk_size] || RANDOM_CHUNK_SIZE
  @merge_max = opts[:merge_max] || MERGE_MAX
  @parse_extended = opts[:parse_extended] || false
  @parse_empty = opts[:parse_empty] || false
  @chunk_start = 0
  @file_spec = file_spec
  @f = File.open(file_spec)
  reader = opts[:chunk_reader] || ChunkReader
  @cr = reader.new(@f, chunk_size)
  @s = StringScanner.new(cr.read_chunk())
  set_last_block_pos!
  @at_end = false
  _parse_header()
end

Instance Attribute Details

#at_endBoolean (readonly)

Returns whether EOF has been reached.

Returns:

  • (Boolean)

    whether EOF has been reached.



485
486
487
# File 'lib/bio/maf/parser.rb', line 485

def at_end
  @at_end
end

#chunk_startInteger (readonly)

Returns starting offset of the current chunk.

Returns:

  • (Integer)

    starting offset of the current chunk.



489
490
491
# File 'lib/bio/maf/parser.rb', line 489

def chunk_start
  @chunk_start
end

#crChunkReader (readonly)

Returns ChunkReader.

Returns:



483
484
485
# File 'lib/bio/maf/parser.rb', line 483

def cr
  @cr
end

#fFile (readonly)

Returns file handle for MAF file.

Returns:

  • (File)

    file handle for MAF file.



479
480
481
# File 'lib/bio/maf/parser.rb', line 479

def f
  @f
end

#file_specString (readonly)

Returns path of MAF file being parsed.

Returns:

  • (String)

    path of MAF file being parsed.



477
478
479
# File 'lib/bio/maf/parser.rb', line 477

def file_spec
  @file_spec
end

#headerHeader (readonly)

Returns header of the MAF file being parsed.

Returns:

  • (Header)

    header of the MAF file being parsed.



475
476
477
# File 'lib/bio/maf/parser.rb', line 475

def header
  @header
end

#last_block_posInteger (readonly)

Returns offset of the last block start in this chunk.

Returns:

  • (Integer)

    offset of the last block start in this chunk.



491
492
493
# File 'lib/bio/maf/parser.rb', line 491

def last_block_pos
  @last_block_pos
end

#optsHash (readonly)

Returns parser options.

Returns:

  • (Hash)

    parser options.



487
488
489
# File 'lib/bio/maf/parser.rb', line 487

def opts
  @opts
end

#parse_emptyObject



495
496
497
# File 'lib/bio/maf/parser.rb', line 495

def parse_empty
  @parse_empty
end

#parse_extendedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



494
495
496
# File 'lib/bio/maf/parser.rb', line 494

def parse_extended
  @parse_extended
end

#sStringScanner (readonly)

Returns scanner for parsing.

Returns:

  • (StringScanner)

    scanner for parsing.



481
482
483
# File 'lib/bio/maf/parser.rb', line 481

def s
  @s
end

Instance Method Details

#_parse_headerObject

Parse the header of the MAF file.



702
703
704
705
706
707
708
709
710
711
712
713
714
715
# File 'lib/bio/maf/parser.rb', line 702

def _parse_header
  parse_error("not a MAF file") unless s.scan(/##maf\s*/)
  vars = parse_maf_vars()
  align_params = nil
  while s.scan(/^#\s*(.+?)\n/)
    if align_params == nil
      align_params = s[1]
    else
      align_params << ' ' << s[1]
    end
  end
  @header = Header.new(vars, align_params)
  s.skip_until BLOCK_START || parse_error("Cannot find block start!")
end

#_wrap(options, fun, &blk) ⇒ Object

options should be [:outer, ..., :inner]



763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
# File 'lib/bio/maf/parser.rb', line 763

def _wrap(options, fun, &blk)
  first = options.shift
  case first
  when nil
    fun.call(&blk)
  when :sequence_filter
    conv_map(options,
             fun,
             lambda { |b| b if b.sequences.size > 1 },
             &blk)
  when :join_blocks
    block_joiner(options, fun, &blk)
  when :as_bio_alignment
    conv_send(options,
              fun,
              :to_bio_alignment,
              &blk)
  when :remove_gaps
    conv_map(options,
             fun,
             lambda { |b| b.remove_gaps! if b.filtered?; b },
             &blk)
  else
    raise "unhandled wrapper mode: #{first}"
  end
end

#block_joiner(options, fun) {|prev| ... } ⇒ Object

Yields:

  • (prev)


796
797
798
799
800
801
802
803
804
805
806
807
808
# File 'lib/bio/maf/parser.rb', line 796

def block_joiner(options, fun)
  prev = nil
  _wrap(options, fun) do |cur|
    if prev && (prev.filtered? || cur.filtered?) \
      && prev.joinable_with?(cur)
      prev = prev.join(cur)
    else
      yield prev if prev
      prev = cur
    end
  end
  yield prev if prev
end

#closeObject



527
528
529
# File 'lib/bio/maf/parser.rb', line 527

def close
  f.close
end

#context(chunk_size) ⇒ ParseContext

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a Bio::MAF::ParseContext for random access, using the given chunk size.

Returns:



536
537
538
539
540
# File 'lib/bio/maf/parser.rb', line 536

def context(chunk_size)
  # IO#dup calls dup(2) internally, but seems broken on JRuby...
  fd = File.open(file_spec)
  ParseContext.new(fd, chunk_size, self)
end

#conv_map(options, search, fun) ⇒ Object



810
811
812
813
814
815
# File 'lib/bio/maf/parser.rb', line 810

def conv_map(options, search, fun)
  _wrap(options, search) do |block|
    v = fun.call(block)
    yield v if v
  end
end

#conv_send(options, search, sym) ⇒ Object



817
818
819
820
821
822
# File 'lib/bio/maf/parser.rb', line 817

def conv_send(options, search, sym)
  _wrap(options, search) do |block|
    v = block.send(sym)
    yield v if v
  end
end

#each_block {|block| ... } ⇒ Enumerator<Block> Also known as: parse_blocks

Parse all alignment blocks until EOF.

Delegates to #parse_blocks_parallel if :threads is set under JRuby.

Yields:

  • (block)

    Passes each Block in turn to a block

Returns:

  • (Enumerator<Block>)

    enumerator of Blocks if no block given.



725
726
727
728
729
730
731
732
733
734
735
736
# File 'lib/bio/maf/parser.rb', line 725

def each_block(&blk)
  if block_given?
    if RUBY_PLATFORM == 'java' && @opts.has_key?(:threads)
      fun = method(:parse_blocks_parallel)
    else
      fun = method(:each_block_seq)
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:each_block)
  end
end

#each_block_seqObject



739
740
741
742
743
744
# File 'lib/bio/maf/parser.rb', line 739

def each_block_seq
  until at_end
    block = _parse_block()
    yield block if block
  end
end

#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>

Fetch and parse blocks given by fetch_list.

fetch_list should be an array of [offset, length] tuples.

Parameters:

  • fetch_list (Array)

    the fetch list

Yields:

  • (block)

    each block matched, in turn

Returns:

  • (Enumerable<Block>)

    each matching Block, if no block given



577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/bio/maf/parser.rb', line 577

def fetch_blocks(fetch_list, &blk)
  if blk
    merged = merge_fetch_list(fetch_list)
    if RUBY_PLATFORM == 'java' && @opts.fetch(:threads, 1) > 1
      fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) }
    else
      fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) }
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:fetch_blocks, fetch_list)
  end
end

#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>

Fetch and parse the blocks given by the merged fetch list.

Parameters:

Returns:

  • (Array<Block>)

    the requested alignment blocks



595
596
597
598
599
600
601
602
603
604
605
606
607
608
# File 'lib/bio/maf/parser.rb', line 595

def fetch_blocks_merged(fetch_list, &blk)
  start = Time.now
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  with_context(@random_access_chunk_size) do |ctx|
    fetch_list.each do |e|
      ctx.fetch_blocks(*e, &blk)
    end
  end
  elapsed = Time.now - start
  # TODO: debug log
  # rate = (total_size / 1048576.0) / elapsed
  # $stderr.printf("Fetched blocks in %.3fs, %.1f MB/s.\n",
  #                elapsed, rate)
end

#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>

Fetch and parse the blocks given by the merged fetch list, in parallel. Uses the number of threads specified by the :threads parser option.

Parameters:

Returns:

  • (Array<Block>)

    the requested alignment blocks



616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
# File 'lib/bio/maf/parser.rb', line 616

def fetch_blocks_merged_parallel(fetch_list)
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  start = Time.now
  n_threads = @opts.fetch(:threads, 1)
  # TODO: break entries up into longer runs for more
  # sequential I/O
  jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list)
  ct = CompletionTracker.new(fetch_list)
  completed = ct.queue
  threads = []
  n_threads.times { threads << make_worker(jobs, ct) }

  n_res = 0
  while n_res < fetch_list.size
    c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    unless c
      raise "Worker failed!" if threads.find { |t| t.status.nil? }
      next
    end
    c.each do |block|
      yield block
    end
    n_res += 1
  end
  threads.each { |t| t.join }
  elapsed = Time.now - start
  $stderr.printf("Fetched blocks from %d threads in %.1fs.\n",
                 n_threads,
                 elapsed)
  mb = total_size / 1048576.0
  $stderr.printf("%.3f MB processed (%.1f MB/s).\n",
                 mb,
                 mb / elapsed)
end

#filter_seq_count(fun) ⇒ Object



790
791
792
793
794
# File 'lib/bio/maf/parser.rb', line 790

def filter_seq_count(fun)
  fun.call() do |block|
    yield block if block.filtered? && block.sequences.size > 1
  end
end

#make_worker(jobs, ct) ⇒ Object

Create a worker thread for parallel parsing.



654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
# File 'lib/bio/maf/parser.rb', line 654

def make_worker(jobs, ct)
  Thread.new do
    begin
      with_context(@random_access_chunk_size) do |ctx|
        while true
          req = jobs.poll
          break unless req
          n_blocks = req[2].size
          blocks = ctx.fetch_blocks(*req).to_a
          if blocks.size != n_blocks
            raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}"
          end
          ct << blocks
        end
      end
    rescue Exception => e
      $stderr.puts "Worker failing: #{e.class}: #{e}"
      $stderr.puts e.backtrace.join("\n")
      raise e
    end
  end
end

#merge_fetch_list(orig_fl) ⇒ Object

Merge contiguous blocks in the given fetch list, up to :merge_max bytes.

Returns [offset, size, [offset1, offset2, ...]] tuples.



681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
# File 'lib/bio/maf/parser.rb', line 681

def merge_fetch_list(orig_fl)
  fl = orig_fl.dup
  r = []
  until fl.empty? do
    cur = fl.shift
    if r.last \
      && (r.last[0] + r.last[1]) == cur[0] \
      && (r.last[1] + cur[1]) <= @merge_max
      # contiguous with the previous one
      # add to length and increment count
      r.last[1] += cur[1]
      r.last[2] << cur[0]
    else
      cur << [cur[0]]
      r << cur
    end
  end
  return r
end

#parse_blockObject



746
747
748
749
750
751
752
# File 'lib/bio/maf/parser.rb', line 746

def parse_block
  b = nil
  wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block|
    b = block
  end
  b
end

#parse_blocks_parallelObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Parse alignment blocks with a worker thread.



828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
# File 'lib/bio/maf/parser.rb', line 828

def parse_blocks_parallel
  queue = java.util.concurrent.LinkedBlockingQueue.new(128)
  worker = Thread.new do
    begin
      until at_end
        block = _parse_block()
        queue.put(block) if block
      end
      queue.put(:eof)
    rescue
      $stderr.puts "worker exiting: #{$!.class}: #{$!}"
      $stderr.puts $!.backtrace.join("\n")
    end
  end
  saw_eof = false
  n_final_poll = 0
  while true
    block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    if block == :eof
      saw_eof = true
      break
    elsif block
      yield block
    else
      # timed out
      n_final_poll += 1 unless worker.alive?
    end
    break if n_final_poll > 1
  end
  unless saw_eof
    raise "worker exited unexpectedly!"
  end
end

#sequence_filterHash

Sequence filter to apply.

Returns:

  • (Hash)


559
560
561
# File 'lib/bio/maf/parser.rb', line 559

def sequence_filter
  @sequence_filter ||= {}
end

#sequence_filter=(filter) ⇒ Object

Set the sequence filter.

Parameters:

  • filter (Hash)

    the new filter



566
567
568
# File 'lib/bio/maf/parser.rb', line 566

def sequence_filter=(filter)
  @sequence_filter = filter
end

#with_context(chunk_size) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Execute the given block with a Bio::MAF::ParseContext using the given chunk_size as an argument.

See Also:



547
548
549
550
551
552
553
554
# File 'lib/bio/maf/parser.rb', line 547

def with_context(chunk_size)
  ctx = context(chunk_size)
  begin
    yield ctx
  ensure
    ctx.f.close
  end
end

#wrap_block_seq(fun, &blk) ⇒ Object



756
757
758
759
760
# File 'lib/bio/maf/parser.rb', line 756

def wrap_block_seq(fun, &blk)
  opts = WRAP_OPTS.find_all { |o| @opts[o] }
  opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?)
  _wrap(opts, fun, &blk)
end