Class: Bio::MAF::Parser

Inherits:
Object
  • Object
show all
Includes:
MAFParsing
Defined in:
lib/bio/maf/parser.rb

Overview

MAF parser, used for sequential and random-access parsing.

Options:

  • :parse_extended: whether to parse 'i' and 'q' lines
  • :parse_empty: whether to parse 'e' lines
  • :remove_gaps: remove gaps left after filtering sequences
  • :join_blocks: join blocks where possible
  • :upcase: fold sequence data to upper case
  • :chunk_size: read MAF file in chunks of this many bytes
  • :random_chunk_size: as above, but for random access (#fetch_blocks)
  • :merge_max: merge up to this many bytes of blocks for random access
  • :threads: number of threads to use for parallel parsing. Only useful under JRuby.
  • :strict: abort on un-parseable lines instead of continuing with a warning.

Constant Summary collapse

SEQ_CHUNK_SIZE =
131072
RANDOM_CHUNK_SIZE =
4096
MERGE_MAX =
SEQ_CHUNK_SIZE
DEFAULT_OPTS =
{
  :chunk_size => SEQ_CHUNK_SIZE,
  :random_chunk_size => RANDOM_CHUNK_SIZE,
  :merge_max => MERGE_MAX,
  :parse_extended => false,
  :parse_empty => false,
  :readahead_thread => true,
  :seq_parse_thread => true
}
WRAP_OPTS =
[:as_bio_alignment, :join_blocks, :remove_gaps, :upcase]

Constants included from MAFParsing

MAFParsing::BLOCK_START, MAFParsing::BLOCK_START_OR_EOS, MAFParsing::COMMENT, MAFParsing::E, MAFParsing::EOL_OR_EOF, MAFParsing::I, MAFParsing::JRUBY_P, MAFParsing::Q, MAFParsing::S, MAFParsing::STRAND_SYM

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MAFParsing

#_parse_block, #gather_leading_fragment, #parse_block_data, #parse_empty_line, #parse_error, #parse_maf_vars, #parse_seq_line, #parse_trailing_fragment, #seq_filter_ok?, #set_last_block_pos!, #trailing_nl?

Constructor Details

#initialize(file_spec, parse_opts = {}) ⇒ Parser

Create a new parser instance.

Parameters:

  • file_spec (String)

    path of file to parse.

  • parse_opts (Hash) (defaults to: {})

    parser options.



587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/bio/maf/parser.rb', line 587

def initialize(file_spec, parse_opts={})
  opts = DEFAULT_OPTS.merge(parse_opts)
  @opts = opts
  @random_access_chunk_size = opts[:random_chunk_size]
  @merge_max = opts[:merge_max]
  @parse_extended = opts[:parse_extended]
  @parse_empty = opts[:parse_empty]
  @chunk_start = 0
  if file_spec.respond_to? :flush
    # an IO object
    # guess what, Pathnames respond to :read...
    @f = file_spec
    @file_spec = @f.path if @f.respond_to?(:path)
    # TODO: test for gzip?
  else
    # a pathname (or Pathname)
    @file_spec = file_spec
    @phys_f = File.open(file_spec)
    if file_spec.to_s.end_with?(".maf.gz")
      @f = Zlib::GzipReader.new(@phys_f)
      @compression = :gzip
    else
      @f = @phys_f
    end
  end
  if @file_spec.to_s =~ /\.bgzf?$/
    @base_reader = BGZFChunkReader
    @compression = :bgzf
  else
    @base_reader = ChunkReader
  end
  @cr = base_reader.new(@f, opts[:chunk_size])
  if JRUBY_P && opts[:readahead_thread]
    LOG.debug "Using ThreadedChunkReaderWrapper."
    @cr = ThreadedChunkReaderWrapper.new(@cr)
  end
  @s = StringScanner.new(cr.read_chunk())
  set_last_block_pos!
  @at_end = false
  _parse_header()
end

Instance Attribute Details

#at_endBoolean (readonly)

Returns whether EOF has been reached.

Returns:

  • (Boolean)

    whether EOF has been reached.



551
552
553
# File 'lib/bio/maf/parser.rb', line 551

def at_end
  @at_end
end

#base_readerClass (readonly)

Returns ChunkReader class to use for random access.

Returns:

  • (Class)

    ChunkReader class to use for random access

See Also:



549
550
551
# File 'lib/bio/maf/parser.rb', line 549

def base_reader
  @base_reader
end

#chunk_startInteger (readonly)

Returns starting offset of the current chunk.

Returns:

  • (Integer)

    starting offset of the current chunk.



555
556
557
# File 'lib/bio/maf/parser.rb', line 555

def chunk_start
  @chunk_start
end

#compressionSymbol (readonly)

Returns compression method used for this file, or nil.

Returns:

  • (Symbol)

    compression method used for this file, or nil



559
560
561
# File 'lib/bio/maf/parser.rb', line 559

def compression
  @compression
end

#crChunkReader (readonly)

Returns ChunkReader.

Returns:



546
547
548
# File 'lib/bio/maf/parser.rb', line 546

def cr
  @cr
end

#fIO (readonly)

Returns file handle for MAF file.

Returns:

  • (IO)

    file handle for MAF file.



538
539
540
# File 'lib/bio/maf/parser.rb', line 538

def f
  @f
end

#file_specString (readonly)

Returns path of MAF file being parsed.

Returns:

  • (String)

    path of MAF file being parsed.



536
537
538
# File 'lib/bio/maf/parser.rb', line 536

def file_spec
  @file_spec
end

#headerHeader (readonly)

Returns header of the MAF file being parsed.

Returns:

  • (Header)

    header of the MAF file being parsed.



534
535
536
# File 'lib/bio/maf/parser.rb', line 534

def header
  @header
end

#last_block_posInteger (readonly)

Returns offset of the last block start in this chunk.

Returns:

  • (Integer)

    offset of the last block start in this chunk.



557
558
559
# File 'lib/bio/maf/parser.rb', line 557

def last_block_pos
  @last_block_pos
end

#optsHash (readonly)

Returns parser options.

Returns:

  • (Hash)

    parser options.



553
554
555
# File 'lib/bio/maf/parser.rb', line 553

def opts
  @opts
end

#parse_emptyObject



563
564
565
# File 'lib/bio/maf/parser.rb', line 563

def parse_empty
  @parse_empty
end

#parse_extendedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



562
563
564
# File 'lib/bio/maf/parser.rb', line 562

def parse_extended
  @parse_extended
end

#phys_fIO (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

May be gzip-compressed.

Returns:

  • (IO)

    file handle for physical MAF file.



542
543
544
# File 'lib/bio/maf/parser.rb', line 542

def phys_f
  @phys_f
end

#sStringScanner (readonly)

Returns scanner for parsing.

Returns:

  • (StringScanner)

    scanner for parsing.



544
545
546
# File 'lib/bio/maf/parser.rb', line 544

def s
  @s
end

Instance Method Details

#_merge_bgzf_fetch_list(orig_fl) ⇒ Object

Build a merged fetch list in a BGZF-aware way. This will group together all MAF blocks from a single BGZF block. These MAF blocks may not be consecutive.



820
821
822
823
824
825
826
827
828
829
830
831
# File 'lib/bio/maf/parser.rb', line 820

def _merge_bgzf_fetch_list(orig_fl)
  block_e = orig_fl.chunk { |entry|
    Bio::BGZF::vo_block_offset(entry[0])
  }
  block_e.collect do |bgzf_block, fl|
    # text size to read from disk, from the start of the first
    # block to the end of the last block
    text_size = fl.last[0] + fl.last[1] - fl.first[0]
    offsets = fl.collect { |e| e[0] }
    [fl.first[0], text_size, offsets]
  end
end

#_merge_fetch_list(orig_fl) ⇒ Object



797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
# File 'lib/bio/maf/parser.rb', line 797

def _merge_fetch_list(orig_fl)
  fl = orig_fl.dup
  r = []
  until fl.empty? do
    cur = fl.shift
    if r.last \
      && (r.last[0] + r.last[1]) == cur[0] \
      && (r.last[1] + cur[1]) <= @merge_max
      # contiguous with the previous one
      # add to length and increment count
      r.last[1] += cur[1]
      r.last[2] << cur[0]
    else
      cur << [cur[0]]
      r << cur
    end
  end
  return r
end

#_parse_headerObject

Parse the header of the MAF file.



834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
# File 'lib/bio/maf/parser.rb', line 834

def _parse_header
  parse_error("not a MAF file") unless s.scan(/##maf\s*/)
  vars = parse_maf_vars()
  align_params = nil
  while s.scan(/^#\s*(.+?)\n/)
    if align_params == nil
      align_params = s[1]
    else
      align_params << ' ' << s[1]
    end
  end
  @header = Header.new(vars, align_params)
  if ! s.skip_until(BLOCK_START)
    @at_end = true
  end
end

#_wrap(options, fun, &blk) ⇒ Object

options should be [:outer, ..., :inner]



898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
# File 'lib/bio/maf/parser.rb', line 898

def _wrap(options, fun, &blk)
  first = options.shift
  case first
  when nil
    fun.call(&blk)
  when :sequence_filter
    conv_map(options,
             fun,
             lambda { |b| b if b.sequences.size > 1 },
             &blk)
  when :join_blocks
    block_joiner(options, fun, &blk)
  when :as_bio_alignment
    conv_send(options,
              fun,
              :to_bio_alignment,
              &blk)
  when :upcase
    conv_send(options,
              fun,
              :upcase!,
              true,
              &blk)
  when :remove_gaps
    conv_map(options,
             fun,
             lambda { |b| b.remove_gaps! if b.filtered?; b },
             &blk)
  else
    raise "unhandled wrapper mode: #{first}"
  end
end

#block_joiner(options, fun) {|prev| ... } ⇒ Object

Yields:

  • (prev)


937
938
939
940
941
942
943
944
945
946
947
948
949
# File 'lib/bio/maf/parser.rb', line 937

def block_joiner(options, fun)
  prev = nil
  _wrap(options, fun) do |cur|
    if prev && (prev.filtered? || cur.filtered?) \
      && prev.joinable_with?(cur)
      prev = prev.join(cur)
    else
      yield prev if prev
      prev = cur
    end
  end
  yield prev if prev
end

#closeObject



629
630
631
# File 'lib/bio/maf/parser.rb', line 629

def close
  f.close
end

#context(chunk_size) ⇒ ParseContext

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Create a Bio::MAF::ParseContext for random access, using the given chunk size.

Returns:



638
639
640
641
642
643
644
645
646
# File 'lib/bio/maf/parser.rb', line 638

def context(chunk_size)
  # IO#dup calls dup(2) internally, but seems broken on JRuby...
  if file_spec
    fd = File.open(file_spec)
  else
    fd = f.dup
  end
  ParseContext.new(fd, chunk_size, self)
end

#conv_map(options, search, fun) ⇒ Object



951
952
953
954
955
956
# File 'lib/bio/maf/parser.rb', line 951

def conv_map(options, search, fun)
  _wrap(options, search) do |block|
    v = fun.call(block)
    yield v if v
  end
end

#conv_send(options, search, sym, always_yield_block = false) ⇒ Object



958
959
960
961
962
963
964
965
966
967
# File 'lib/bio/maf/parser.rb', line 958

def conv_send(options, search, sym, always_yield_block=false)
  _wrap(options, search) do |block|
    v = block.send(sym)
    if always_yield_block
      yield block
    else
      yield v if v
    end
  end
end

#each_block {|block| ... } ⇒ Enumerator<Block> Also known as: parse_blocks

Parse all alignment blocks until EOF.

Delegates to #parse_blocks_parallel if :threads is set under JRuby.

Yields:

  • (block)

    Passes each Block in turn to a block

Returns:

  • (Enumerator<Block>)

    enumerator of Blocks if no block given.



859
860
861
862
863
864
865
866
867
868
869
870
# File 'lib/bio/maf/parser.rb', line 859

def each_block(&blk)
  if block_given?
    if JRUBY_P && opts[:seq_parse_thread]
      fun = method(:parse_blocks_parallel)
    else
      fun = method(:each_block_seq)
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:each_block)
  end
end

#each_block_seqObject



873
874
875
876
877
878
# File 'lib/bio/maf/parser.rb', line 873

def each_block_seq
  until at_end
    block = _parse_block()
    yield block if block
  end
end

#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>

Fetch and parse blocks given by fetch_list.

fetch_list should be an array of [offset, length] tuples.

Parameters:

  • fetch_list (Array)

    the fetch list

Yields:

  • (block)

    each block matched, in turn

Returns:

  • (Enumerable<Block>)

    each matching Block, if no block given



683
684
685
686
687
688
689
690
691
692
693
694
695
# File 'lib/bio/maf/parser.rb', line 683

def fetch_blocks(fetch_list, &blk)
  if blk
    merged = merge_fetch_list(fetch_list)
    if JRUBY_P && @opts.fetch(:threads, 1) > 1
      fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) }
    else
      fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) }
    end
    wrap_block_seq(fun, &blk)
  else
    enum_for(:fetch_blocks, fetch_list)
  end
end

#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>

Fetch and parse the blocks given by the merged fetch list.

Parameters:

Returns:

  • (Array<Block>)

    the requested alignment blocks



701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
# File 'lib/bio/maf/parser.rb', line 701

def fetch_blocks_merged(fetch_list, &blk)
  start = Time.now
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  count = 0
  with_context(@random_access_chunk_size) do |ctx|
    fetch_list.each do |e|
      ctx.fetch_blocks(*e, &blk)
      count += 1
    end
  end
  elapsed = Time.now - start
  rate = (total_size / 1048576.0) / elapsed
  LOG.debug { sprintf("Fetched %d blocks in %.3fs, %.1f MB/s.",
                      count, elapsed, rate) }
end

#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>

Fetch and parse the blocks given by the merged fetch list, in parallel. Uses the number of threads specified by the :threads parser option.

Parameters:

Returns:

  • (Array<Block>)

    the requested alignment blocks



723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
# File 'lib/bio/maf/parser.rb', line 723

def fetch_blocks_merged_parallel(fetch_list)
  total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
  start = Time.now
  n_threads = @opts.fetch(:threads, 1)
  # TODO: break entries up into longer runs for more
  # sequential I/O
  jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list)
  ct = CompletionTracker.new(fetch_list)
  completed = ct.queue
  threads = []
  n_threads.times { threads << make_worker(jobs, ct) }

  n_res = 0
  while n_res < fetch_list.size
    c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    unless c
      raise "Worker failed!" if threads.find { |t| t.status.nil? }
      next
    end
    c.each do |block|
      yield block
    end
    n_res += 1
  end
  threads.each { |t| t.join }
  elapsed = Time.now - start
  LOG.debug { sprintf("Fetched blocks from %d threads in %.1fs.",
                      n_threads,
                      elapsed) }
  mb = total_size / 1048576.0
  LOG.debug { sprintf("%.3f MB processed (%.1f MB/s).",
                      mb,
                      mb / elapsed) }
end

#filter_seq_count(fun) ⇒ Object



931
932
933
934
935
# File 'lib/bio/maf/parser.rb', line 931

def filter_seq_count(fun)
  fun.call() do |block|
    yield block if block.filtered? && block.sequences.size > 1
  end
end

#make_worker(jobs, ct) ⇒ Object

Create a worker thread for parallel parsing.



761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
# File 'lib/bio/maf/parser.rb', line 761

def make_worker(jobs, ct)
  Thread.new do
    begin
      with_context(@random_access_chunk_size) do |ctx|
        while true
          req = jobs.poll
          break unless req
          n_blocks = req[2].size
          blocks = ctx.fetch_blocks(*req).to_a
          if blocks.size != n_blocks
            raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}"
          end
          ct << blocks
        end
      end
    rescue Exception => e
      LOG.error "Worker failing: #{e.class}: #{e}"
      LOG.error e
      raise e
    end
  end
end

#merge_fetch_list(orig_fl) ⇒ Object

Merge contiguous blocks in the given fetch list, up to :merge_max bytes.

Returns [offset, size, [offset1, offset2, ...]] tuples.



788
789
790
791
792
793
794
795
# File 'lib/bio/maf/parser.rb', line 788

def merge_fetch_list(orig_fl)
  case compression
  when nil
    _merge_fetch_list(orig_fl)
  when :bgzf
    _merge_bgzf_fetch_list(orig_fl)
  end
end

#parse_blockObject



880
881
882
883
884
885
886
# File 'lib/bio/maf/parser.rb', line 880

def parse_block
  b = nil
  wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block|
    b = block
  end
  b
end

#parse_blocks_parallelObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Parse alignment blocks with a worker thread.



973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
# File 'lib/bio/maf/parser.rb', line 973

def parse_blocks_parallel
  queue = java.util.concurrent.LinkedBlockingQueue.new(128)
  worker = Thread.new do
    begin
      LOG.debug "Starting parse worker."
      until at_end
        block = _parse_block()
        queue.put(block) if block
      end
      queue.put(:eof)
      LOG.debug { "Parse worker reached EOF." }
    rescue Exception
      LOG.error $!
      Thread.current[:exception] = $!
      raise
    end
  end
  saw_eof = false
  n_final_poll = 0
  while true
    block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS)
    if block == :eof
      saw_eof = true
      break
    elsif block
      yield block
    else
      # timed out
      unless worker.alive?
        LOG.debug "Worker has exited."
        n_final_poll += 1
      end
    end
    break if n_final_poll > 1
  end
  unless saw_eof
    raise "worker exited unexpectedly from #{worker[:exception]}!"
  end
end

#sequence_filterHash

Sequence filter to apply.

Returns:

  • (Hash)


665
666
667
# File 'lib/bio/maf/parser.rb', line 665

def sequence_filter
  @sequence_filter ||= {}
end

#sequence_filter=(filter) ⇒ Object

Set the sequence filter.

Parameters:

  • filter (Hash)

    the new filter



672
673
674
# File 'lib/bio/maf/parser.rb', line 672

def sequence_filter=(filter)
  @sequence_filter = filter
end

#with_context(chunk_size) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Execute the given block with a Bio::MAF::ParseContext using the given chunk_size as an argument.

See Also:



653
654
655
656
657
658
659
660
# File 'lib/bio/maf/parser.rb', line 653

def with_context(chunk_size)
  ctx = context(chunk_size)
  begin
    yield ctx
  ensure
    ctx.f.close
  end
end

#wrap_block_seq(fun, &blk) ⇒ Object



890
891
892
893
894
895
# File 'lib/bio/maf/parser.rb', line 890

def wrap_block_seq(fun, &blk)
  opts = WRAP_OPTS.find_all { |o| @opts[o] }
  opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?)
  LOG.debug { "wrapping #{fun} with #{opts.inspect}" }
  _wrap(opts, fun, &blk)
end