Class: Bio::MAF::Parser

Inherits:
Object
  • Object
show all
Includes:
MAFParsing
Defined in:
lib/bio/maf/parser.rb

Overview

MAF parser, used for sequential and random-access parsing.

Options:

  • :parse_extended: whether to parse 'i' and 'q' lines
  • :parse_empty: whether to parse 'e' lines
  • :remove_gaps: remove gaps left after filtering sequences
  • :chunk_size: read MAF file in chunks of this many bytes
  • :random_chunk_size: as above, but for random access (#fetch_blocks)
  • :merge_max: merge up to this many bytes of blocks for random access
  • :threads: number of threads to use for parallel parsing. Only useful under JRuby.

Constant Summary collapse

SEQ_CHUNK_SIZE =
131072
RANDOM_CHUNK_SIZE =
4096
MERGE_MAX =
SEQ_CHUNK_SIZE
WRAP_OPTS =
[:as_bio_alignment, :join_blocks, :remove_gaps]

Constants included from MAFParsing

MAFParsing::BLOCK_START, MAFParsing::BLOCK_START_OR_EOS, MAFParsing::COMMENT, MAFParsing::E, MAFParsing::EOL_OR_EOF, MAFParsing::I, MAFParsing::Q, MAFParsing::S, MAFParsing::STRAND_SYM

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MAFParsing

#_parse_block, #gather_leading_fragment, #parse_block_data, #parse_empty_line, #parse_error, #parse_maf_vars, #parse_seq_line, #parse_trailing_fragment, #seq_filter_ok?, #set_last_block_pos!, #trailing_nl?

Constructor Details

#initialize(file_spec, opts = {}) ⇒ Parser

Create a new parser instance.

Parameters:

  • file_spec (String)

    path of file to parse.

  • opts (Hash) (defaults to: {})

    parser options.



555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# File 'lib/bio/maf/parser.rb', line 555

def initialize(file_spec, opts={})
  @opts = opts
  if RUBY_PLATFORM == 'java'
    opts[:threads] ||= java.lang.Runtime.runtime.availableProcessors
  end
  chunk_size = opts[:chunk_size] || SEQ_CHUNK_SIZE
  @random_access_chunk_size = opts[:random_chunk_size] || RANDOM_CHUNK_SIZE
  @merge_max = opts[:merge_max] || MERGE_MAX
  @parse_extended = opts[:parse_extended] || false
  @parse_empty = opts[:parse_empty] || false
  @chunk_start = 0
  if file_spec.respond_to? :flush
    # guess what, Pathnames respond to :read...
    @f = file_spec
    @file_spec = @f.path if @f.respond_to?(:path)
    # TODO: gzip?
  else
    @file_spec = file_spec
    if file_spec.to_s.end_with?(".maf.gz")
      @f = IO.popen("gzip -dc #{file_spec}")
    else
      @f = File.open(file_spec)
    end
  end
  if @file_spec.to_s =~ /\.bgzf?$/
    @base_reader = BGZFChunkReader
    @compression = :bgzf
  else
    @base_reader = ChunkReader
  end
  @cr = base_reader.new(@f, chunk_size)
  if RUBY_PLATFORM == 'java'
    @cr = ThreadedChunkReaderWrapper.new(@cr)
  end
  @s = StringScanner.new(cr.read_chunk())
  set_last_block_pos!
  @at_end = false
  _parse_header()
end

Instance Attribute Details

#at_endBoolean (readonly)

Returns whether EOF has been reached.

Returns:

  • (Boolean)

    whether EOF has been reached.



532
533
534
# File 'lib/bio/maf/parser.rb', line 532

def at_end
  @at_end
end

#base_readerClass (readonly)

Returns ChunkReader class to use for random access.

Returns:

  • (Class)

    ChunkReader class to use for random access

See Also:



530
531
532
# File 'lib/bio/maf/parser.rb', line 530

def base_reader
  @base_reader
end

#chunk_startInteger (readonly)

Returns starting offset of the current chunk.

Returns:

  • (Integer)

    starting offset of the current chunk.



536
537
538
# File 'lib/bio/maf/parser.rb', line 536

def chunk_start
  @chunk_start
end

#compressionSymbol (readonly)

Returns compression method used for this file, or nil.

Returns:

  • (Symbol)

    compression method used for this file, or nil



540
541
542
# File 'lib/bio/maf/parser.rb', line 540

def compression
  @compression
end

#crChunkReader (readonly)

Returns ChunkReader.

Returns:



527
528
529
# File 'lib/bio/maf/parser.rb', line 527

def cr
  @cr
end

#fFile (readonly)

Returns file handle for MAF file.

Returns:

  • (File)

    file handle for MAF file.



523
524
525
# File 'lib/bio/maf/parser.rb', line 523

def f
  @f
end

#file_specString (readonly)

Returns path of MAF file being parsed.

Returns:

  • (String)

    path of MAF file being parsed.



521
522
523
# File 'lib/bio/maf/parser.rb', line 521

def file_spec
  @file_spec
end

#headerHeader (readonly)

Returns header of the MAF file being parsed.

Returns:

  • (Header)

    header of the MAF file being parsed.



519
520
521
# File 'lib/bio/maf/parser.rb', line 519

def header
  @header
end

#last_block_posInteger (readonly)

Returns offset of the last block start in this chunk.

Returns:

  • (Integer)

    offset of the last block start in this chunk.



538
539
540
# File 'lib/bio/maf/parser.rb', line 538

def last_block_pos
  @last_block_pos
end

#optsHash (readonly)

Returns parser options.

Returns:

  • (Hash)

    parser options.



534
535
536
# File 'lib/bio/maf/parser.rb', line 534

def opts
  @opts
end

#parse_emptyObject



544
545
546
# File 'lib/bio/maf/parser.rb', line 544

def parse_empty
  @parse_empty
end

#parse_extendedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



543
544
545
# File 'lib/bio/maf/parser.rb', line 543

def parse_extended
  @parse_extended
end

#sStringScanner (readonly)

Returns scanner for parsing.

Returns:

  • (StringScanner)

    scanner for parsing.



525
526
527
# File 'lib/bio/maf/parser.rb', line 525

def s
  @s
end

Instance Method Details

#_merge_bgzf_fetch_list(orig_fl) ⇒ Object

Build a merged fetch list in a BGZF-aware way. This will group together all MAF blocks from a single BGZF block. These MAF blocks may not be consecutive.



784
785
786
787
788
789
790
791
792
793
794
795
# File 'lib/bio/maf/parser.rb', line 784

def _merge_bgzf_fetch_list(orig_fl)
  block_e = orig_fl.chunk { |entry|
    Bio::BGZF::vo_block_offset(entry[0])
  }
  block_e.collect do |bgzf_block, fl|
    # text size to read from disk, from the start of the first
    # block to the end of the last block
    text_size = fl.last[0] + fl.last[1] - fl.first[0]
    offsets = fl.collect { |e| e[0] }
    [fl.first[0], text_size, offsets]
  end
end

#_merge_fetch_list(orig_fl) ⇒ Object



761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
# File 'lib/bio/maf/parser.rb', line 761

def _merge_fetch_list(orig_fl)
  fl = orig_fl.dup
  r = []
  until fl.empty? do
    cur = fl.shift
    if r.last \
      && (r.last[0] + r.last[1]) == cur[0] \
      && (r.last[1] + cur[1]) <= @merge_max
      # contiguous with the previous one
      # add to length and increment count
      r.last[1] += cur[1]
      r.last[2] << cur[0]
    else
      cur << [cur[0]]
      r << cur
    end
  end
  return r
end

#_parse_headerObject

Parse the header of the MAF file.



798
799
800
801
802
803
804
805
806
807
808
809
810
811
# File 'lib/bio/maf/parser.rb', line 798

def _parse_header
  parse_error("not a MAF file") unless s.scan(/##maf\s*/)
  vars = parse_maf_vars()
  align_params = nil
  while s.scan(/^#\s*(.+?)\n/)
    if align_params == nil
      align_params = s[1]
    else
      align_params << ' ' << s[1]
    end
  end
  @header = Header.new(vars, align_params)
  s.skip_until BLOCK_START || parse_error("Cannot find block start!")
end

#_wrap(options, fun, &blk) ⇒ Object

options should be [:outer, ..., :inner]



859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
# File 'lib/bio/maf/parser.rb', line 859

def _wrap(options, fun, &blk)
  first = options.shift
  case first
  when nil
    fun.call(&blk)
  when :sequence_filter
    conv_map(options,
             fun,
             lambda { |b| b if b.sequences.size > 1 },
             &blk)
  when :join_blocks
    block_joiner(options, fun, &blk)
  when :as_bio_alignment
    conv_send(options,
              fun,
              :to_bio_alignment,
              &blk)
  when :remove_gaps
    conv_map(options,
             fun,
             lambda { |b| b.remove_gaps! if b.filtered?; b },
             &blk)
  else
    raise "unhandled wrapper mode: #{first}"
  end
end

#block_joiner(options, fun) {|prev| ... } ⇒ Object

Yields:

  • (prev)


892
893
894
895
896
897
898
899
900
901
902
903
904
# File 'lib/bio/maf/parser.rb', line 892

def block_joiner(options, fun)
  prev = nil
  _wrap(options, fun) do |cur|
    if prev && (prev.filtered? || cur.filtered?) \
      && prev.joinable_with?(cur)
      prev = prev.join(cur)
    else
      yield prev if prev
      prev = cur
    end
  end
  yield prev if prev
end

#closeObject



595
596
597
# File 'lib/bio/maf/parser.rb', line 595

def close
  f.close
end

#context(chunk_size) ⇒ ParseContext

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a Bio::MAF::ParseContext for random access, using the given chunk size.

Returns:



604
605
606
607
608
609
610
611
612
# File 'lib/bio/maf/parser.rb', line 604

def context(chunk_size)
  # IO#dup calls dup(2) internally, but seems broken on JRuby...
  if file_spec
    fd = File.open(file_spec)
  else
    fd = f.dup
  end
  ParseContext.new(fd, chunk_size, self)
end

#conv_map(options, search, fun) ⇒ Object



906
907
908
909
910
911
# File 'lib/bio/maf/parser.rb', line 906

def conv_map(options, search, fun)
  _wrap(options, search) do |block|
    v = fun.call(block)
    yield v if v
  end
end

#conv_send(options, search, sym) ⇒ Object



913
914
915
916
917
918
# File 'lib/bio/maf/parser.rb', line 913

def conv_send(options, search, sym)
  _wrap(options, search) do |block|
    v = block.send(sym)
    yield v if v
  end
end

#each_block {|block| ... } ⇒ Enumerator<Block> Also known as: parse_blocks

Parse all alignment blocks until EOF.

Delegates to #parse_blocks_parallel if :threads is set under JRuby.

Yields:

  • (block)

    Passes each Block in turn to a block

Returns:

  • (Enumerator<Block>)

    enumerator of Blocks if no block given.



821
822
823
824
825
826
827
828
829
830
831
832
# File 'lib/bio/maf/parser.rb', line 821

def each_block(&blk)
  if block_given?
    if RUBY_PLATFORM == 'java' && @opts.has_key?(:threads)
      fun = method(:parse_blocks_parallel)
    else
      fun = method(:each_block_seq)
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:each_block)
  end
end

#each_block_seqObject



835
836
837
838
839
840
# File 'lib/bio/maf/parser.rb', line 835

def each_block_seq
  until at_end
    block = _parse_block()
    yield block if block
  end
end

#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>

Fetch and parse blocks given by fetch_list.

fetch_list should be an array of [offset, length] tuples.

Parameters:

  • fetch_list (Array)

    the fetch list

Yields:

  • (block)

    each block matched, in turn

Returns:

  • (Enumerable<Block>)

    each matching Block, if no block given



649
650
651
652
653
654
655
656
657
658
659
660
661
# File 'lib/bio/maf/parser.rb', line 649

def fetch_blocks(fetch_list, &blk)
  if blk
    merged = merge_fetch_list(fetch_list)
    if RUBY_PLATFORM == 'java' && @opts.fetch(:threads, 1) > 1
      fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) }
    else
      fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) }
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:fetch_blocks, fetch_list)
  end
end

#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>

Fetch and parse the blocks given by the merged fetch list.

Parameters:

Returns:

  • (Array<Block>)

    the requested alignment blocks



667
668
669
670
671
672
673
674
675
676
677
678
679
# File 'lib/bio/maf/parser.rb', line 667

def fetch_blocks_merged(fetch_list, &blk)
  start = Time.now
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  with_context(@random_access_chunk_size) do |ctx|
    fetch_list.each do |e|
      ctx.fetch_blocks(*e, &blk)
    end
  end
  elapsed = Time.now - start
  rate = (total_size / 1048576.0) / elapsed
  LOG.debug { sprintf("Fetched blocks in %.3fs, %.1f MB/s.",
                      elapsed, rate) }
end

#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>

Fetch and parse the blocks given by the merged fetch list, in parallel. Uses the number of threads specified by the :threads parser option.

Parameters:

Returns:

  • (Array<Block>)

    the requested alignment blocks



687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
# File 'lib/bio/maf/parser.rb', line 687

def fetch_blocks_merged_parallel(fetch_list)
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  start = Time.now
  n_threads = @opts.fetch(:threads, 1)
  # TODO: break entries up into longer runs for more
  # sequential I/O
  jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list)
  ct = CompletionTracker.new(fetch_list)
  completed = ct.queue
  threads = []
  n_threads.times { threads << make_worker(jobs, ct) }

  n_res = 0
  while n_res < fetch_list.size
    c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    unless c
      raise "Worker failed!" if threads.find { |t| t.status.nil? }
      next
    end
    c.each do |block|
      yield block
    end
    n_res += 1
  end
  threads.each { |t| t.join }
  elapsed = Time.now - start
  LOG.debug { sprintf("Fetched blocks from %d threads in %.1fs.",
                      n_threads,
                      elapsed) }
  mb = total_size / 1048576.0
  LOG.debug { sprintf("%.3f MB processed (%.1f MB/s).",
                      mb,
                      mb / elapsed) }
end

#filter_seq_count(fun) ⇒ Object



886
887
888
889
890
# File 'lib/bio/maf/parser.rb', line 886

def filter_seq_count(fun)
  fun.call() do |block|
    yield block if block.filtered? && block.sequences.size > 1
  end
end

#make_worker(jobs, ct) ⇒ Object

Create a worker thread for parallel parsing.



725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
# File 'lib/bio/maf/parser.rb', line 725

def make_worker(jobs, ct)
  Thread.new do
    begin
      with_context(@random_access_chunk_size) do |ctx|
        while true
          req = jobs.poll
          break unless req
          n_blocks = req[2].size
          blocks = ctx.fetch_blocks(*req).to_a
          if blocks.size != n_blocks
            raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}"
          end
          ct << blocks
        end
      end
    rescue Exception => e
      LOG.error "Worker failing: #{e.class}: #{e}"
      LOG.error e
      raise e
    end
  end
end

#merge_fetch_list(orig_fl) ⇒ Object

Merge contiguous blocks in the given fetch list, up to :merge_max bytes.

Returns [offset, size, [offset1, offset2, ...]] tuples.



752
753
754
755
756
757
758
759
# File 'lib/bio/maf/parser.rb', line 752

def merge_fetch_list(orig_fl)
  case compression
  when nil
    _merge_fetch_list(orig_fl)
  when :bgzf
    _merge_bgzf_fetch_list(orig_fl)
  end
end

#parse_blockObject



842
843
844
845
846
847
848
# File 'lib/bio/maf/parser.rb', line 842

def parse_block
  b = nil
  wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block|
    b = block
  end
  b
end

#parse_blocks_parallelObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Parse alignment blocks with a worker thread.



924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
# File 'lib/bio/maf/parser.rb', line 924

def parse_blocks_parallel
  queue = java.util.concurrent.LinkedBlockingQueue.new(128)
  worker = Thread.new do
    begin
      until at_end
        block = _parse_block()
        queue.put(block) if block
      end
      queue.put(:eof)
    rescue
      LOG.error "worker exiting: #{$!.class}: #{$!}"
      LOG.error $!
    end
  end
  saw_eof = false
  n_final_poll = 0
  while true
    block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    if block == :eof
      saw_eof = true
      break
    elsif block
      yield block
    else
      # timed out
      n_final_poll += 1 unless worker.alive?
    end
    break if n_final_poll > 1
  end
  unless saw_eof
    raise "worker exited unexpectedly!"
  end
end

#sequence_filterHash

Sequence filter to apply.

Returns:

  • (Hash)


631
632
633
# File 'lib/bio/maf/parser.rb', line 631

def sequence_filter
  @sequence_filter ||= {}
end

#sequence_filter=(filter) ⇒ Object

Set the sequence filter.

Parameters:

  • filter (Hash)

    the new filter



638
639
640
# File 'lib/bio/maf/parser.rb', line 638

def sequence_filter=(filter)
  @sequence_filter = filter
end

#with_context(chunk_size) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Execute the given block with a Bio::MAF::ParseContext using the given chunk_size as an argument.

See Also:



619
620
621
622
623
624
625
626
# File 'lib/bio/maf/parser.rb', line 619

def with_context(chunk_size)
  ctx = context(chunk_size)
  begin
    yield ctx
  ensure
    ctx.f.close
  end
end

#wrap_block_seq(fun, &blk) ⇒ Object



852
853
854
855
856
# File 'lib/bio/maf/parser.rb', line 852

def wrap_block_seq(fun, &blk)
  opts = WRAP_OPTS.find_all { |o| @opts[o] }
  opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?)
  _wrap(opts, fun, &blk)
end