Class: Bio::MAF::Parser
- Inherits:
-
Object
- Object
- Bio::MAF::Parser
- Includes:
- MAFParsing
- Defined in:
- lib/bio/maf/parser.rb
Overview
MAF parser, used for sequential and random-access parsing.
Options:
:parse_extended
: whether to parse 'i' and 'q' lines:parse_empty
: whether to parse 'e' lines:remove_gaps
: remove gaps left after filtering sequences:chunk_size
: read MAF file in chunks of this many bytes:random_chunk_size
: as above, but for random access (#fetch_blocks):merge_max
: merge up to this many bytes of blocks for random access:threads
: number of threads to use for parallel parsing. Only useful under JRuby.
Constant Summary collapse
- SEQ_CHUNK_SIZE =
131072
- RANDOM_CHUNK_SIZE =
4096
- MERGE_MAX =
SEQ_CHUNK_SIZE
- WRAP_OPTS =
[:as_bio_alignment, :join_blocks, :remove_gaps]
Constants included from MAFParsing
MAFParsing::BLOCK_START, MAFParsing::BLOCK_START_OR_EOS, MAFParsing::COMMENT, MAFParsing::E, MAFParsing::EOL_OR_EOF, MAFParsing::I, MAFParsing::Q, MAFParsing::S, MAFParsing::STRAND_SYM
Instance Attribute Summary collapse
-
#at_end ⇒ Boolean
readonly
Whether EOF has been reached.
-
#base_reader ⇒ Class
readonly
ChunkReader class to use for random access.
-
#chunk_start ⇒ Integer
readonly
Starting offset of the current chunk.
-
#compression ⇒ Symbol
readonly
Compression method used for this file, or nil.
-
#cr ⇒ ChunkReader
readonly
ChunkReader.
-
#f ⇒ File
readonly
File handle for MAF file.
-
#file_spec ⇒ String
readonly
Path of MAF file being parsed.
-
#header ⇒ Header
readonly
Header of the MAF file being parsed.
-
#last_block_pos ⇒ Integer
readonly
Offset of the last block start in this chunk.
-
#opts ⇒ Hash
readonly
Parser options.
- #parse_empty ⇒ Object
- #parse_extended ⇒ Object private
-
#s ⇒ StringScanner
readonly
Scanner for parsing.
Instance Method Summary collapse
-
#_merge_bgzf_fetch_list(orig_fl) ⇒ Object
Build a merged fetch list in a BGZF-aware way.
- #_merge_fetch_list(orig_fl) ⇒ Object
-
#_parse_header ⇒ Object
Parse the header of the MAF file.
-
#_wrap(options, fun, &blk) ⇒ Object
options should be [:outer, ..., :inner].
- #block_joiner(options, fun) {|prev| ... } ⇒ Object
- #close ⇒ Object
-
#context(chunk_size) ⇒ ParseContext
private
Create a ParseContext for random access, using the given chunk size.
- #conv_map(options, search, fun) ⇒ Object
- #conv_send(options, search, sym) ⇒ Object
-
#each_block {|block| ... } ⇒ Enumerator<Block>
(also: #parse_blocks)
Parse all alignment blocks until EOF.
- #each_block_seq ⇒ Object
-
#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>
Fetch and parse blocks given by
fetch_list
. -
#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list.
-
#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list, in parallel.
- #filter_seq_count(fun) ⇒ Object
-
#initialize(file_spec, opts = {}) ⇒ Parser
constructor
Create a new parser instance.
-
#make_worker(jobs, ct) ⇒ Object
Create a worker thread for parallel parsing.
-
#merge_fetch_list(orig_fl) ⇒ Object
Merge contiguous blocks in the given fetch list, up to
:merge_max
bytes. - #parse_block ⇒ Object
-
#parse_blocks_parallel ⇒ Object
private
Parse alignment blocks with a worker thread.
-
#sequence_filter ⇒ Hash
Sequence filter to apply.
-
#sequence_filter=(filter) ⇒ Object
Set the sequence filter.
-
#with_context(chunk_size) ⇒ Object
private
Execute the given block with a ParseContext using the given
chunk_size
as an argument. - #wrap_block_seq(fun, &blk) ⇒ Object
Methods included from MAFParsing
#_parse_block, #gather_leading_fragment, #parse_block_data, #parse_empty_line, #parse_error, #parse_maf_vars, #parse_seq_line, #parse_trailing_fragment, #seq_filter_ok?, #set_last_block_pos!, #trailing_nl?
Constructor Details
#initialize(file_spec, opts = {}) ⇒ Parser
Create a new parser instance.
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 |
# File 'lib/bio/maf/parser.rb', line 555 def initialize(file_spec, opts={}) @opts = opts if RUBY_PLATFORM == 'java' opts[:threads] ||= java.lang.Runtime.runtime.availableProcessors end chunk_size = opts[:chunk_size] || SEQ_CHUNK_SIZE @random_access_chunk_size = opts[:random_chunk_size] || RANDOM_CHUNK_SIZE @merge_max = opts[:merge_max] || MERGE_MAX @parse_extended = opts[:parse_extended] || false @parse_empty = opts[:parse_empty] || false @chunk_start = 0 if file_spec.respond_to? :flush # guess what, Pathnames respond to :read... @f = file_spec @file_spec = @f.path if @f.respond_to?(:path) # TODO: gzip? else @file_spec = file_spec if file_spec.to_s.end_with?(".maf.gz") @f = IO.popen("gzip -dc #{file_spec}") else @f = File.open(file_spec) end end if @file_spec.to_s =~ /\.bgzf?$/ @base_reader = BGZFChunkReader @compression = :bgzf else @base_reader = ChunkReader end @cr = base_reader.new(@f, chunk_size) if RUBY_PLATFORM == 'java' @cr = ThreadedChunkReaderWrapper.new(@cr) end @s = StringScanner.new(cr.read_chunk()) set_last_block_pos! @at_end = false _parse_header() end |
Instance Attribute Details
#at_end ⇒ Boolean (readonly)
Returns whether EOF has been reached.
532 533 534 |
# File 'lib/bio/maf/parser.rb', line 532 def at_end @at_end end |
#base_reader ⇒ Class (readonly)
Returns ChunkReader class to use for random access.
530 531 532 |
# File 'lib/bio/maf/parser.rb', line 530 def base_reader @base_reader end |
#chunk_start ⇒ Integer (readonly)
Returns starting offset of the current chunk.
536 537 538 |
# File 'lib/bio/maf/parser.rb', line 536 def chunk_start @chunk_start end |
#compression ⇒ Symbol (readonly)
Returns compression method used for this file, or nil.
540 541 542 |
# File 'lib/bio/maf/parser.rb', line 540 def compression @compression end |
#cr ⇒ ChunkReader (readonly)
Returns ChunkReader.
527 528 529 |
# File 'lib/bio/maf/parser.rb', line 527 def cr @cr end |
#f ⇒ File (readonly)
Returns file handle for MAF file.
523 524 525 |
# File 'lib/bio/maf/parser.rb', line 523 def f @f end |
#file_spec ⇒ String (readonly)
Returns path of MAF file being parsed.
521 522 523 |
# File 'lib/bio/maf/parser.rb', line 521 def file_spec @file_spec end |
#header ⇒ Header (readonly)
Returns header of the MAF file being parsed.
519 520 521 |
# File 'lib/bio/maf/parser.rb', line 519 def header @header end |
#last_block_pos ⇒ Integer (readonly)
Returns offset of the last block start in this chunk.
538 539 540 |
# File 'lib/bio/maf/parser.rb', line 538 def last_block_pos @last_block_pos end |
#opts ⇒ Hash (readonly)
Returns parser options.
534 535 536 |
# File 'lib/bio/maf/parser.rb', line 534 def opts @opts end |
#parse_empty ⇒ Object
544 545 546 |
# File 'lib/bio/maf/parser.rb', line 544 def parse_empty @parse_empty end |
#parse_extended ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
543 544 545 |
# File 'lib/bio/maf/parser.rb', line 543 def parse_extended @parse_extended end |
#s ⇒ StringScanner (readonly)
Returns scanner for parsing.
525 526 527 |
# File 'lib/bio/maf/parser.rb', line 525 def s @s end |
Instance Method Details
#_merge_bgzf_fetch_list(orig_fl) ⇒ Object
Build a merged fetch list in a BGZF-aware way. This will group together all MAF blocks from a single BGZF block. These MAF blocks may not be consecutive.
784 785 786 787 788 789 790 791 792 793 794 795 |
# File 'lib/bio/maf/parser.rb', line 784 def _merge_bgzf_fetch_list(orig_fl) block_e = orig_fl.chunk { |entry| Bio::BGZF::vo_block_offset(entry[0]) } block_e.collect do |bgzf_block, fl| # text size to read from disk, from the start of the first # block to the end of the last block text_size = fl.last[0] + fl.last[1] - fl.first[0] offsets = fl.collect { |e| e[0] } [fl.first[0], text_size, offsets] end end |
#_merge_fetch_list(orig_fl) ⇒ Object
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 |
# File 'lib/bio/maf/parser.rb', line 761 def _merge_fetch_list(orig_fl) fl = orig_fl.dup r = [] until fl.empty? do cur = fl.shift if r.last \ && (r.last[0] + r.last[1]) == cur[0] \ && (r.last[1] + cur[1]) <= @merge_max # contiguous with the previous one # add to length and increment count r.last[1] += cur[1] r.last[2] << cur[0] else cur << [cur[0]] r << cur end end return r end |
#_parse_header ⇒ Object
Parse the header of the MAF file.
798 799 800 801 802 803 804 805 806 807 808 809 810 811 |
# File 'lib/bio/maf/parser.rb', line 798 def _parse_header parse_error("not a MAF file") unless s.scan(/##maf\s*/) vars = parse_maf_vars() align_params = nil while s.scan(/^#\s*(.+?)\n/) if align_params == nil align_params = s[1] else align_params << ' ' << s[1] end end @header = Header.new(vars, align_params) s.skip_until BLOCK_START || parse_error("Cannot find block start!") end |
#_wrap(options, fun, &blk) ⇒ Object
options should be [:outer, ..., :inner]
859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 |
# File 'lib/bio/maf/parser.rb', line 859 def _wrap(, fun, &blk) first = .shift case first when nil fun.call(&blk) when :sequence_filter conv_map(, fun, lambda { |b| b if b.sequences.size > 1 }, &blk) when :join_blocks block_joiner(, fun, &blk) when :as_bio_alignment conv_send(, fun, :to_bio_alignment, &blk) when :remove_gaps conv_map(, fun, lambda { |b| b.remove_gaps! if b.filtered?; b }, &blk) else raise "unhandled wrapper mode: #{first}" end end |
#block_joiner(options, fun) {|prev| ... } ⇒ Object
892 893 894 895 896 897 898 899 900 901 902 903 904 |
# File 'lib/bio/maf/parser.rb', line 892 def block_joiner(, fun) prev = nil _wrap(, fun) do |cur| if prev && (prev.filtered? || cur.filtered?) \ && prev.joinable_with?(cur) prev = prev.join(cur) else yield prev if prev prev = cur end end yield prev if prev end |
#close ⇒ Object
595 596 597 |
# File 'lib/bio/maf/parser.rb', line 595 def close f.close end |
#context(chunk_size) ⇒ ParseContext
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a Bio::MAF::ParseContext for random access, using the given chunk size.
604 605 606 607 608 609 610 611 612 |
# File 'lib/bio/maf/parser.rb', line 604 def context(chunk_size) # IO#dup calls dup(2) internally, but seems broken on JRuby... if file_spec fd = File.open(file_spec) else fd = f.dup end ParseContext.new(fd, chunk_size, self) end |
#conv_map(options, search, fun) ⇒ Object
906 907 908 909 910 911 |
# File 'lib/bio/maf/parser.rb', line 906 def conv_map(, search, fun) _wrap(, search) do |block| v = fun.call(block) yield v if v end end |
#conv_send(options, search, sym) ⇒ Object
913 914 915 916 917 918 |
# File 'lib/bio/maf/parser.rb', line 913 def conv_send(, search, sym) _wrap(, search) do |block| v = block.send(sym) yield v if v end end |
#each_block {|block| ... } ⇒ Enumerator<Block> Also known as: parse_blocks
Parse all alignment blocks until EOF.
Delegates to #parse_blocks_parallel if :threads
is set
under JRuby.
821 822 823 824 825 826 827 828 829 830 831 832 |
# File 'lib/bio/maf/parser.rb', line 821 def each_block(&blk) if block_given? if RUBY_PLATFORM == 'java' && @opts.has_key?(:threads) fun = method(:parse_blocks_parallel) else fun = method(:each_block_seq) end wrap_block_seq(fun, &blk) else enum_for(:each_block) end end |
#each_block_seq ⇒ Object
835 836 837 838 839 840 |
# File 'lib/bio/maf/parser.rb', line 835 def each_block_seq until at_end block = _parse_block() yield block if block end end |
#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>
Fetch and parse blocks given by fetch_list
.
fetch_list
should be an array of [offset, length]
tuples.
649 650 651 652 653 654 655 656 657 658 659 660 661 |
# File 'lib/bio/maf/parser.rb', line 649 def fetch_blocks(fetch_list, &blk) if blk merged = merge_fetch_list(fetch_list) if RUBY_PLATFORM == 'java' && @opts.fetch(:threads, 1) > 1 fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) } else fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) } end wrap_block_seq(fun, &blk) else enum_for(:fetch_blocks, fetch_list) end end |
#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list.
667 668 669 670 671 672 673 674 675 676 677 678 679 |
# File 'lib/bio/maf/parser.rb', line 667 def fetch_blocks_merged(fetch_list, &blk) start = Time.now total_size = fetch_list.collect { |e| e[1] }.reduce(:+) with_context(@random_access_chunk_size) do |ctx| fetch_list.each do |e| ctx.fetch_blocks(*e, &blk) end end elapsed = Time.now - start rate = (total_size / 1048576.0) / elapsed LOG.debug { sprintf("Fetched blocks in %.3fs, %.1f MB/s.", elapsed, rate) } end |
#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list, in
parallel. Uses the number of threads specified by the
:threads
parser option.
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 |
# File 'lib/bio/maf/parser.rb', line 687 def fetch_blocks_merged_parallel(fetch_list) total_size = fetch_list.collect { |e| e[1] }.reduce(:+) start = Time.now n_threads = @opts.fetch(:threads, 1) # TODO: break entries up into longer runs for more # sequential I/O jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list) ct = CompletionTracker.new(fetch_list) completed = ct.queue threads = [] n_threads.times { threads << make_worker(jobs, ct) } n_res = 0 while n_res < fetch_list.size c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS) unless c raise "Worker failed!" if threads.find { |t| t.status.nil? } next end c.each do |block| yield block end n_res += 1 end threads.each { |t| t.join } elapsed = Time.now - start LOG.debug { sprintf("Fetched blocks from %d threads in %.1fs.", n_threads, elapsed) } mb = total_size / 1048576.0 LOG.debug { sprintf("%.3f MB processed (%.1f MB/s).", mb, mb / elapsed) } end |
#filter_seq_count(fun) ⇒ Object
886 887 888 889 890 |
# File 'lib/bio/maf/parser.rb', line 886 def filter_seq_count(fun) fun.call() do |block| yield block if block.filtered? && block.sequences.size > 1 end end |
#make_worker(jobs, ct) ⇒ Object
Create a worker thread for parallel parsing.
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 |
# File 'lib/bio/maf/parser.rb', line 725 def make_worker(jobs, ct) Thread.new do begin with_context(@random_access_chunk_size) do |ctx| while true req = jobs.poll break unless req n_blocks = req[2].size blocks = ctx.fetch_blocks(*req).to_a if blocks.size != n_blocks raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}" end ct << blocks end end rescue Exception => e LOG.error "Worker failing: #{e.class}: #{e}" LOG.error e raise e end end end |
#merge_fetch_list(orig_fl) ⇒ Object
Merge contiguous blocks in the given fetch list, up to
:merge_max
bytes.
Returns [offset, size, [offset1, offset2, ...]]
tuples.
752 753 754 755 756 757 758 759 |
# File 'lib/bio/maf/parser.rb', line 752 def merge_fetch_list(orig_fl) case compression when nil _merge_fetch_list(orig_fl) when :bgzf _merge_bgzf_fetch_list(orig_fl) end end |
#parse_block ⇒ Object
842 843 844 845 846 847 848 |
# File 'lib/bio/maf/parser.rb', line 842 def parse_block b = nil wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block| b = block end b end |
#parse_blocks_parallel ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Parse alignment blocks with a worker thread.
924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 |
# File 'lib/bio/maf/parser.rb', line 924 def parse_blocks_parallel queue = java.util.concurrent.LinkedBlockingQueue.new(128) worker = Thread.new do begin until at_end block = _parse_block() queue.put(block) if block end queue.put(:eof) rescue LOG.error "worker exiting: #{$!.class}: #{$!}" LOG.error $! end end saw_eof = false n_final_poll = 0 while true block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS) if block == :eof saw_eof = true break elsif block yield block else # timed out n_final_poll += 1 unless worker.alive? end break if n_final_poll > 1 end unless saw_eof raise "worker exited unexpectedly!" end end |
#sequence_filter ⇒ Hash
Sequence filter to apply.
631 632 633 |
# File 'lib/bio/maf/parser.rb', line 631 def sequence_filter @sequence_filter ||= {} end |
#sequence_filter=(filter) ⇒ Object
Set the sequence filter.
638 639 640 |
# File 'lib/bio/maf/parser.rb', line 638 def sequence_filter=(filter) @sequence_filter = filter end |
#with_context(chunk_size) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Execute the given block with a Bio::MAF::ParseContext using the given
chunk_size
as an argument.
619 620 621 622 623 624 625 626 |
# File 'lib/bio/maf/parser.rb', line 619 def with_context(chunk_size) ctx = context(chunk_size) begin yield ctx ensure ctx.f.close end end |
#wrap_block_seq(fun, &blk) ⇒ Object
852 853 854 855 856 |
# File 'lib/bio/maf/parser.rb', line 852 def wrap_block_seq(fun, &blk) opts = WRAP_OPTS.find_all { |o| @opts[o] } opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?) _wrap(opts, fun, &blk) end |