Class: Bio::MAF::Parser
- Inherits:
-
Object
- Object
- Bio::MAF::Parser
- Includes:
- MAFParsing
- Defined in:
- lib/bio/maf/parser.rb
Overview
MAF parser, used for sequential and random-access parsing.
Options:
:parse_extended
: whether to parse 'i' and 'q' lines:parse_empty
: whether to parse 'e' lines:remove_gaps
: remove gaps left after filtering sequences:chunk_size
: read MAF file in chunks of this many bytes:random_chunk_size
: as above, but for random access (#fetch_blocks):merge_max
: merge up to this many bytes of blocks for random access:chunk_reader
: use the specified class to read chunks. (Only useful with ThreadedChunkReader).:threads
: number of threads to use for parallel parsing. Only useful under JRuby.
Constant Summary collapse
- SEQ_CHUNK_SIZE =
131072
- RANDOM_CHUNK_SIZE =
4096
- MERGE_MAX =
SEQ_CHUNK_SIZE
- WRAP_OPTS =
[:as_bio_alignment, :join_blocks, :remove_gaps]
Constants included from MAFParsing
MAFParsing::BLOCK_START, MAFParsing::BLOCK_START_OR_EOS, MAFParsing::COMMENT, MAFParsing::E, MAFParsing::EOL_OR_EOF, MAFParsing::I, MAFParsing::Q, MAFParsing::S, MAFParsing::STRAND_SYM
Instance Attribute Summary collapse
-
#at_end ⇒ Boolean
readonly
Whether EOF has been reached.
-
#chunk_start ⇒ Integer
readonly
Starting offset of the current chunk.
-
#cr ⇒ ChunkReader
readonly
ChunkReader.
-
#f ⇒ File
readonly
File handle for MAF file.
-
#file_spec ⇒ String
readonly
Path of MAF file being parsed.
-
#header ⇒ Header
readonly
Header of the MAF file being parsed.
-
#last_block_pos ⇒ Integer
readonly
Offset of the last block start in this chunk.
-
#opts ⇒ Hash
readonly
Parser options.
- #parse_empty ⇒ Object
- #parse_extended ⇒ Object private
-
#s ⇒ StringScanner
readonly
Scanner for parsing.
Instance Method Summary collapse
-
#_parse_header ⇒ Object
Parse the header of the MAF file.
-
#_wrap(options, fun, &blk) ⇒ Object
options should be [:outer, ..., :inner].
- #block_joiner(options, fun) {|prev| ... } ⇒ Object
- #close ⇒ Object
-
#context(chunk_size) ⇒ ParseContext
private
Create a ParseContext for random access, using the given chunk size.
- #conv_map(options, search, fun) ⇒ Object
- #conv_send(options, search, sym) ⇒ Object
-
#each_block {|block| ... } ⇒ Enumerator<Block>
(also: #parse_blocks)
Parse all alignment blocks until EOF.
- #each_block_seq ⇒ Object
-
#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>
Fetch and parse blocks given by
fetch_list
. -
#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list.
-
#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list, in parallel.
- #filter_seq_count(fun) ⇒ Object
-
#initialize(file_spec, opts = {}) ⇒ Parser
constructor
Create a new parser instance.
-
#make_worker(jobs, ct) ⇒ Object
Create a worker thread for parallel parsing.
-
#merge_fetch_list(orig_fl) ⇒ Object
Merge contiguous blocks in the given fetch list, up to
:merge_max
bytes. - #parse_block ⇒ Object
-
#parse_blocks_parallel ⇒ Object
private
Parse alignment blocks with a worker thread.
-
#sequence_filter ⇒ Hash
Sequence filter to apply.
-
#sequence_filter=(filter) ⇒ Object
Set the sequence filter.
-
#with_context(chunk_size) ⇒ Object
private
Execute the given block with a ParseContext using the given
chunk_size
as an argument. - #wrap_block_seq(fun, &blk) ⇒ Object
Methods included from MAFParsing
#_parse_block, #gather_leading_fragment, #parse_block_data, #parse_empty_line, #parse_error, #parse_maf_vars, #parse_seq_line, #parse_trailing_fragment, #seq_filter_ok?, #set_last_block_pos!, #trailing_nl?
Constructor Details
#initialize(file_spec, opts = {}) ⇒ Parser
Create a new parser instance.
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 |
# File 'lib/bio/maf/parser.rb', line 506 def initialize(file_spec, opts={}) @opts = opts if RUBY_PLATFORM == 'java' opts[:threads] ||= java.lang.Runtime.runtime.availableProcessors end chunk_size = opts[:chunk_size] || SEQ_CHUNK_SIZE @random_access_chunk_size = opts[:random_chunk_size] || RANDOM_CHUNK_SIZE @merge_max = opts[:merge_max] || MERGE_MAX @parse_extended = opts[:parse_extended] || false @parse_empty = opts[:parse_empty] || false @chunk_start = 0 @file_spec = file_spec @f = File.open(file_spec) reader = opts[:chunk_reader] || ChunkReader @cr = reader.new(@f, chunk_size) @s = StringScanner.new(cr.read_chunk()) set_last_block_pos! @at_end = false _parse_header() end |
Instance Attribute Details
#at_end ⇒ Boolean (readonly)
Returns whether EOF has been reached.
485 486 487 |
# File 'lib/bio/maf/parser.rb', line 485 def at_end @at_end end |
#chunk_start ⇒ Integer (readonly)
Returns starting offset of the current chunk.
489 490 491 |
# File 'lib/bio/maf/parser.rb', line 489 def chunk_start @chunk_start end |
#cr ⇒ ChunkReader (readonly)
Returns ChunkReader.
483 484 485 |
# File 'lib/bio/maf/parser.rb', line 483 def cr @cr end |
#f ⇒ File (readonly)
Returns file handle for MAF file.
479 480 481 |
# File 'lib/bio/maf/parser.rb', line 479 def f @f end |
#file_spec ⇒ String (readonly)
Returns path of MAF file being parsed.
477 478 479 |
# File 'lib/bio/maf/parser.rb', line 477 def file_spec @file_spec end |
#header ⇒ Header (readonly)
Returns header of the MAF file being parsed.
475 476 477 |
# File 'lib/bio/maf/parser.rb', line 475 def header @header end |
#last_block_pos ⇒ Integer (readonly)
Returns offset of the last block start in this chunk.
491 492 493 |
# File 'lib/bio/maf/parser.rb', line 491 def last_block_pos @last_block_pos end |
#opts ⇒ Hash (readonly)
Returns parser options.
487 488 489 |
# File 'lib/bio/maf/parser.rb', line 487 def opts @opts end |
#parse_empty ⇒ Object
495 496 497 |
# File 'lib/bio/maf/parser.rb', line 495 def parse_empty @parse_empty end |
#parse_extended ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
494 495 496 |
# File 'lib/bio/maf/parser.rb', line 494 def parse_extended @parse_extended end |
#s ⇒ StringScanner (readonly)
Returns scanner for parsing.
481 482 483 |
# File 'lib/bio/maf/parser.rb', line 481 def s @s end |
Instance Method Details
#_parse_header ⇒ Object
Parse the header of the MAF file.
702 703 704 705 706 707 708 709 710 711 712 713 714 715 |
# File 'lib/bio/maf/parser.rb', line 702 def _parse_header parse_error("not a MAF file") unless s.scan(/##maf\s*/) vars = parse_maf_vars() align_params = nil while s.scan(/^#\s*(.+?)\n/) if align_params == nil align_params = s[1] else align_params << ' ' << s[1] end end @header = Header.new(vars, align_params) s.skip_until BLOCK_START || parse_error("Cannot find block start!") end |
#_wrap(options, fun, &blk) ⇒ Object
options should be [:outer, ..., :inner]
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 |
# File 'lib/bio/maf/parser.rb', line 763 def _wrap(, fun, &blk) first = .shift case first when nil fun.call(&blk) when :sequence_filter conv_map(, fun, lambda { |b| b if b.sequences.size > 1 }, &blk) when :join_blocks block_joiner(, fun, &blk) when :as_bio_alignment conv_send(, fun, :to_bio_alignment, &blk) when :remove_gaps conv_map(, fun, lambda { |b| b.remove_gaps! if b.filtered?; b }, &blk) else raise "unhandled wrapper mode: #{first}" end end |
#block_joiner(options, fun) {|prev| ... } ⇒ Object
796 797 798 799 800 801 802 803 804 805 806 807 808 |
# File 'lib/bio/maf/parser.rb', line 796 def block_joiner(, fun) prev = nil _wrap(, fun) do |cur| if prev && (prev.filtered? || cur.filtered?) \ && prev.joinable_with?(cur) prev = prev.join(cur) else yield prev if prev prev = cur end end yield prev if prev end |
#close ⇒ Object
527 528 529 |
# File 'lib/bio/maf/parser.rb', line 527 def close f.close end |
#context(chunk_size) ⇒ ParseContext
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Create a Bio::MAF::ParseContext for random access, using the given chunk size.
536 537 538 539 540 |
# File 'lib/bio/maf/parser.rb', line 536 def context(chunk_size) # IO#dup calls dup(2) internally, but seems broken on JRuby... fd = File.open(file_spec) ParseContext.new(fd, chunk_size, self) end |
#conv_map(options, search, fun) ⇒ Object
810 811 812 813 814 815 |
# File 'lib/bio/maf/parser.rb', line 810 def conv_map(, search, fun) _wrap(, search) do |block| v = fun.call(block) yield v if v end end |
#conv_send(options, search, sym) ⇒ Object
817 818 819 820 821 822 |
# File 'lib/bio/maf/parser.rb', line 817 def conv_send(, search, sym) _wrap(, search) do |block| v = block.send(sym) yield v if v end end |
#each_block {|block| ... } ⇒ Enumerator<Block> Also known as: parse_blocks
Parse all alignment blocks until EOF.
Delegates to #parse_blocks_parallel if :threads
is set
under JRuby.
725 726 727 728 729 730 731 732 733 734 735 736 |
# File 'lib/bio/maf/parser.rb', line 725 def each_block(&blk) if block_given? if RUBY_PLATFORM == 'java' && @opts.has_key?(:threads) fun = method(:parse_blocks_parallel) else fun = method(:each_block_seq) end wrap_block_seq(fun, &blk) else enum_for(:each_block) end end |
#each_block_seq ⇒ Object
739 740 741 742 743 744 |
# File 'lib/bio/maf/parser.rb', line 739 def each_block_seq until at_end block = _parse_block() yield block if block end end |
#fetch_blocks(fetch_list) {|block| ... } ⇒ Enumerable<Block>
Fetch and parse blocks given by fetch_list
.
fetch_list
should be an array of [offset, length]
tuples.
577 578 579 580 581 582 583 584 585 586 587 588 589 |
# File 'lib/bio/maf/parser.rb', line 577 def fetch_blocks(fetch_list, &blk) if blk merged = merge_fetch_list(fetch_list) if RUBY_PLATFORM == 'java' && @opts.fetch(:threads, 1) > 1 fun = lambda { |&b2| fetch_blocks_merged_parallel(merged, &b2) } else fun = lambda { |&b2| fetch_blocks_merged(merged, &b2) } end wrap_block_seq(fun, &blk) else enum_for(:fetch_blocks, fetch_list) end end |
#fetch_blocks_merged(fetch_list, &blk) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list.
595 596 597 598 599 600 601 602 603 604 605 606 607 608 |
# File 'lib/bio/maf/parser.rb', line 595 def fetch_blocks_merged(fetch_list, &blk) start = Time.now total_size = fetch_list.collect { |e| e[1] }.reduce(:+) with_context(@random_access_chunk_size) do |ctx| fetch_list.each do |e| ctx.fetch_blocks(*e, &blk) end end elapsed = Time.now - start # TODO: debug log # rate = (total_size / 1048576.0) / elapsed # $stderr.printf("Fetched blocks in %.3fs, %.1f MB/s.\n", # elapsed, rate) end |
#fetch_blocks_merged_parallel(fetch_list) ⇒ Array<Block>
Fetch and parse the blocks given by the merged fetch list, in
parallel. Uses the number of threads specified by the
:threads
parser option.
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 |
# File 'lib/bio/maf/parser.rb', line 616 def fetch_blocks_merged_parallel(fetch_list) total_size = fetch_list.collect { |e| e[1] }.reduce(:+) start = Time.now n_threads = @opts.fetch(:threads, 1) # TODO: break entries up into longer runs for more # sequential I/O jobs = java.util.concurrent.ConcurrentLinkedQueue.new(fetch_list) ct = CompletionTracker.new(fetch_list) completed = ct.queue threads = [] n_threads.times { threads << make_worker(jobs, ct) } n_res = 0 while n_res < fetch_list.size c = completed.poll(1, java.util.concurrent.TimeUnit::SECONDS) unless c raise "Worker failed!" if threads.find { |t| t.status.nil? } next end c.each do |block| yield block end n_res += 1 end threads.each { |t| t.join } elapsed = Time.now - start $stderr.printf("Fetched blocks from %d threads in %.1fs.\n", n_threads, elapsed) mb = total_size / 1048576.0 $stderr.printf("%.3f MB processed (%.1f MB/s).\n", mb, mb / elapsed) end |
#filter_seq_count(fun) ⇒ Object
790 791 792 793 794 |
# File 'lib/bio/maf/parser.rb', line 790 def filter_seq_count(fun) fun.call() do |block| yield block if block.filtered? && block.sequences.size > 1 end end |
#make_worker(jobs, ct) ⇒ Object
Create a worker thread for parallel parsing.
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 |
# File 'lib/bio/maf/parser.rb', line 654 def make_worker(jobs, ct) Thread.new do begin with_context(@random_access_chunk_size) do |ctx| while true req = jobs.poll break unless req n_blocks = req[2].size blocks = ctx.fetch_blocks(*req).to_a if blocks.size != n_blocks raise "expected #{n_blocks}, got #{blocks.size}: #{e.inspect}" end ct << blocks end end rescue Exception => e $stderr.puts "Worker failing: #{e.class}: #{e}" $stderr.puts e.backtrace.join("\n") raise e end end end |
#merge_fetch_list(orig_fl) ⇒ Object
Merge contiguous blocks in the given fetch list, up to
:merge_max
bytes.
Returns [offset, size, [offset1, offset2, ...]]
tuples.
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 |
# File 'lib/bio/maf/parser.rb', line 681 def merge_fetch_list(orig_fl) fl = orig_fl.dup r = [] until fl.empty? do cur = fl.shift if r.last \ && (r.last[0] + r.last[1]) == cur[0] \ && (r.last[1] + cur[1]) <= @merge_max # contiguous with the previous one # add to length and increment count r.last[1] += cur[1] r.last[2] << cur[0] else cur << [cur[0]] r << cur end end return r end |
#parse_block ⇒ Object
746 747 748 749 750 751 752 |
# File 'lib/bio/maf/parser.rb', line 746 def parse_block b = nil wrap_block_seq(lambda { |&blk| blk.call(_parse_block()) }) do |block| b = block end b end |
#parse_blocks_parallel ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Parse alignment blocks with a worker thread.
828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 |
# File 'lib/bio/maf/parser.rb', line 828 def parse_blocks_parallel queue = java.util.concurrent.LinkedBlockingQueue.new(128) worker = Thread.new do begin until at_end block = _parse_block() queue.put(block) if block end queue.put(:eof) rescue $stderr.puts "worker exiting: #{$!.class}: #{$!}" $stderr.puts $!.backtrace.join("\n") end end saw_eof = false n_final_poll = 0 while true block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS) if block == :eof saw_eof = true break elsif block yield block else # timed out n_final_poll += 1 unless worker.alive? end break if n_final_poll > 1 end unless saw_eof raise "worker exited unexpectedly!" end end |
#sequence_filter ⇒ Hash
Sequence filter to apply.
559 560 561 |
# File 'lib/bio/maf/parser.rb', line 559 def sequence_filter @sequence_filter ||= {} end |
#sequence_filter=(filter) ⇒ Object
Set the sequence filter.
566 567 568 |
# File 'lib/bio/maf/parser.rb', line 566 def sequence_filter=(filter) @sequence_filter = filter end |
#with_context(chunk_size) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Execute the given block with a Bio::MAF::ParseContext using the given
chunk_size
as an argument.
547 548 549 550 551 552 553 554 |
# File 'lib/bio/maf/parser.rb', line 547 def with_context(chunk_size) ctx = context(chunk_size) begin yield ctx ensure ctx.f.close end end |
#wrap_block_seq(fun, &blk) ⇒ Object
756 757 758 759 760 |
# File 'lib/bio/maf/parser.rb', line 756 def wrap_block_seq(fun, &blk) opts = WRAP_OPTS.find_all { |o| @opts[o] } opts << :sequence_filter if sequence_filter && (! sequence_filter.empty?) _wrap(opts, fun, &blk) end |