Class: Bio::MAF::KyotoIndex
- Inherits:
-
Object
- Object
- Bio::MAF::KyotoIndex
- Includes:
- KVHelpers
- Defined in:
- lib/bio/maf/index.rb
Constant Summary collapse
- FORMAT_VERSION_KEY =
'bio-maf:index-format-version'
- FORMAT_VERSION =
2
- REF_SEQ_KEY =
'bio-maf:reference-sequence'
- MAX_SPECIES =
64
Constants included from KVHelpers
Bio::MAF::KVHelpers::CHROM_BIN_PREFIX_FMT, Bio::MAF::KVHelpers::KEY_FMT, Bio::MAF::KVHelpers::KEY_SCAN_FMT, Bio::MAF::KVHelpers::VAL_FMT, Bio::MAF::KVHelpers::VAL_IDX_OFFSET_FMT, Bio::MAF::KVHelpers::VAL_N_SEQ_FMT, Bio::MAF::KVHelpers::VAL_SPECIES_FMT, Bio::MAF::KVHelpers::VAL_TEXT_SIZE_FMT
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
Returns the value of attribute db.
-
#index_sequences ⇒ Object
Returns the value of attribute index_sequences.
-
#ref_only ⇒ Object
readonly
Returns the value of attribute ref_only.
-
#ref_seq ⇒ Object
Returns the value of attribute ref_seq.
-
#species ⇒ Object
readonly
Returns the value of attribute species.
-
#species_max_id ⇒ Object
readonly
Returns the value of attribute species_max_id.
Class Method Summary collapse
-
.build(parser, path, ref_only = true) ⇒ KyotoIndex
Build a new index from the MAF file being parsed by
parser
, and store it inpath
. -
.open(path) ⇒ KyotoIndex
Open an existing index for reading.
Instance Method Summary collapse
- #build(parser, ref_only = true) ⇒ Object
- #build_block_value(block) ⇒ Object
-
#close ⇒ Object
Close the underlying Kyoto Cabinet database handle.
- #dump(stream = $stdout) ⇒ Object
- #entries_for(block) ⇒ Object
-
#fetch_list(intervals, filter_spec = {}) ⇒ Object
Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval objects.
-
#find(intervals, parser, filter = {}) {|block| ... } ⇒ Enumerable<Block>
Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval objects, and parse them with the given parser.
- #index_blocks(blocks) ⇒ Object
-
#initialize(path, db_arg = nil) ⇒ KyotoIndex
constructor
private
KyotoIndex Internals.
- #load_index_sequences ⇒ Object
- #load_species ⇒ Object
- #make_scan_worker(jobs, completed) ⇒ Object
- #overlaps?(gi, i_start, i_end) ⇒ Boolean
-
#reopen ⇒ Object
Reopen the same DB handle read-only.
- #scan_bin(cur, chrom_id, bin, bin_intervals, filters) ⇒ Object
-
#scan_bins(chrom_id, bin_intervals, filters) ⇒ Object
Scan the index for blocks matching the given bins and intervals.
- #scan_bins_parallel(chrom_id, bin_intervals, filters) ⇒ Object
- #seq_id_for(name) ⇒ Object
- #slice(interval, parser, filter = {}) ⇒ Object
- #species_id_for_seq(seq) ⇒ Object
- #with_profiling ⇒ Object
Methods included from KVHelpers
bin_start_prefix, extract_index_offset, extract_n_sequences, extract_species_vec, extract_text_size, unpack_key
Constructor Details
#initialize(path, db_arg = nil) ⇒ KyotoIndex
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
KyotoIndex Internals
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'lib/bio/maf/index.rb', line 426 def initialize(path, db_arg=nil) @species = {} @species_max_id = -1 @max_sid = -1 if db_arg || ((path.size > 1) and File.exist?(path)) mode = KyotoCabinet::DB::OREADER else mode = KyotoCabinet::DB::OWRITER | KyotoCabinet::DB::OCREATE end @db = db_arg || KyotoCabinet::DB.new @path = path unless db_arg || db.open(path.to_s, mode) raise "Could not open DB file!" end if mode == KyotoCabinet::DB::OREADER self.ref_seq = db[REF_SEQ_KEY] load_index_sequences load_species end end |
Instance Attribute Details
#db ⇒ Object (readonly)
Returns the value of attribute db.
265 266 267 |
# File 'lib/bio/maf/index.rb', line 265 def db @db end |
#index_sequences ⇒ Object
Returns the value of attribute index_sequences.
266 267 268 |
# File 'lib/bio/maf/index.rb', line 266 def index_sequences @index_sequences end |
#ref_only ⇒ Object (readonly)
Returns the value of attribute ref_only.
265 266 267 |
# File 'lib/bio/maf/index.rb', line 265 def ref_only @ref_only end |
#ref_seq ⇒ Object
Returns the value of attribute ref_seq.
266 267 268 |
# File 'lib/bio/maf/index.rb', line 266 def ref_seq @ref_seq end |
#species ⇒ Object (readonly)
Returns the value of attribute species.
265 266 267 |
# File 'lib/bio/maf/index.rb', line 265 def species @species end |
#species_max_id ⇒ Object (readonly)
Returns the value of attribute species_max_id.
265 266 267 |
# File 'lib/bio/maf/index.rb', line 265 def species_max_id @species_max_id end |
Class Method Details
.build(parser, path, ref_only = true) ⇒ KyotoIndex
Build a new index from the MAF file being parsed by parser
,
and store it in path
.
356 357 358 359 360 |
# File 'lib/bio/maf/index.rb', line 356 def self.build(parser, path, ref_only=true) idx = self.new(path) idx.build(parser, ref_only) return idx end |
.open(path) ⇒ KyotoIndex
Open an existing index for reading.
347 348 349 |
# File 'lib/bio/maf/index.rb', line 347 def self.open(path) return KyotoIndex.new(path) end |
Instance Method Details
#build(parser, ref_only = true) ⇒ Object
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 |
# File 'lib/bio/maf/index.rb', line 662 def build(parser, ref_only=true) first_block = parser.parse_block self.ref_seq = first_block.sequences.first.source @ref_only = ref_only db[REF_SEQ_KEY] = ref_seq db[FORMAT_VERSION_KEY] = FORMAT_VERSION @index_sequences = {} index_blocks([first_block]) n = 0 parser.each_block.each_slice(1000).each do |blocks| index_blocks(blocks) n += blocks.size end LOG.debug { "Created index for #{n} blocks and #{@index_sequences.size} sequences." } db.synchronize(true) end |
#build_block_value(block) ⇒ Object
739 740 741 742 743 744 745 746 747 |
# File 'lib/bio/maf/index.rb', line 739 def build_block_value(block) bits = block.sequences.collect {|s| 1 << species_id_for_seq(s.source) } vec = bits.reduce(0, :|) return [block.offset, block.size, block.text_size, block.sequences.size, vec].pack(VAL_FMT) end |
#close ⇒ Object
Close the underlying Kyoto Cabinet database handle.
409 410 411 |
# File 'lib/bio/maf/index.rb', line 409 def close db.close end |
#dump(stream = $stdout) ⇒ Object
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 |
# File 'lib/bio/maf/index.rb', line 452 def dump(stream=$stdout) stream.puts "KyotoIndex dump: #{@path}" stream.puts if db.count == 0 stream.puts "Empty database!" return end db.cursor_process do |cur| stream.puts "== Metadata ==" cur.jump('') while true k, v = cur.get(false) raise "unexpected end of records!" unless k break if k[0] == "\xff" stream.puts "#{k}: #{v}" unless cur.step raise "could not advance cursor!" end end stream.puts "== Index records ==" while pair = cur.get(true) _, chr, bin, s_start, s_end = pair[0].unpack(KEY_FMT) offset, len, text_size, n_seq, species_vec = pair[1].unpack(VAL_FMT) stream.puts "#{chr} [bin #{bin}] #{s_start}:#{s_end}" stream.puts " offset #{offset}, length #{len}" stream.puts " text size: #{text_size}" stream.puts " sequences in block: #{n_seq}" stream.printf(" species vector: %016x\n", species_vec) end end end |
#entries_for(block) ⇒ Object
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 |
# File 'lib/bio/maf/index.rb', line 749 def entries_for(block) unless block.ref_seq.source == @ref_seq raise "Inconsistent reference sequence: expected #{@ref_seq}, got #{block.ref_seq.source}" end h = {} val = build_block_value(block) to_index = ref_only ? [block.sequences.first] : block.sequences to_index.each do |seq| seq_id = seq_id_for(seq.source) seq_end = seq.start + seq.size bin = Bio::Ucsc::UcscBin.bin_from_range(seq.start, seq_end) key = [255, seq_id, bin, seq.start, seq_end].pack(KEY_FMT) h[key] = val end return h end |
#fetch_list(intervals, filter_spec = {}) ⇒ Object
Build a fetch list of alignment blocks to read, given an array of Bio::GenomicInterval objects
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 |
# File 'lib/bio/maf/index.rb', line 506 def fetch_list(intervals, filter_spec={}) filter_spec ||= {} filters = Filters.build(filter_spec, self) chrom = intervals.first.chrom chrom_id = index_sequences[chrom] unless chrom_id raise "chromosome #{chrom} not indexed!" end if intervals.find { |i| i.chrom != chrom } raise "all intervals must be for the same chromosome!" end # for each bin, build a list of the intervals to look for there bin_intervals = Hash.new { |h, k| h[k] = [] } intervals.each do |i| i.bin_all.each do |bin| bin_intervals[bin] << (i.zero_start...i.zero_end) end end bin_intervals.values.each do |intervals| intervals.sort_by! {|i| i.begin} end matches = if RUBY_PLATFORM == 'java' && bin_intervals.size > 4 scan_bins_parallel(chrom_id, bin_intervals, filters) else scan_bins(chrom_id, bin_intervals, filters) end matches.sort_by! { |e| e[0] } # sort by offset in file end |
#find(intervals, parser, filter = {}) {|block| ... } ⇒ Enumerable<Block>
Find all alignment blocks in the genomic regions in the list of Bio::GenomicInterval objects, and parse them with the given parser.
An optional Hash of filters may be passed in. The following keys are used:
:with_all_species => ["sp1", "sp2", ...]
Only match alignment blocks containing all given species.
:at_least_n_sequences => n
Only match alignment blocks with at least N sequences.
:min_size => n
Only match alignment blocks with text size at least N.
:max_size => n
Only match alignment blocks with text size at most N.
393 394 395 396 397 398 399 400 401 402 403 404 405 406 |
# File 'lib/bio/maf/index.rb', line 393 def find(intervals, parser, filter={}, &blk) start = Time.now fl = fetch_list(intervals, filter) LOG.debug { sprintf("Built fetch list of %d items in %.3fs.\n", fl.size, Time.now - start) } if ! fl.empty? parser.fetch_blocks(fl, &blk) else if ! block_given? [] end end end |
#index_blocks(blocks) ⇒ Object
679 680 681 682 |
# File 'lib/bio/maf/index.rb', line 679 def index_blocks(blocks) h = blocks.map { |b| entries_for(b) }.reduce(:merge!) db.set_bulk(h, false) end |
#load_index_sequences ⇒ Object
684 685 686 687 688 689 690 691 692 693 |
# File 'lib/bio/maf/index.rb', line 684 def load_index_sequences h = {} db.match_prefix("sequence:").each do |key| _, name = key.split(':', 2) id = db[key].to_i h[name] = id end @index_sequences = h @max_sid = @index_sequences.values.max end |
#load_species ⇒ Object
706 707 708 709 710 711 712 713 |
# File 'lib/bio/maf/index.rb', line 706 def load_species db.match_prefix("species:").each do |key| _, name = key.split(':', 2) id = db[key].to_i @species[name] = id end @species_max_id = @species.values.sort.last || -1 end |
#make_scan_worker(jobs, completed) ⇒ Object
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 |
# File 'lib/bio/maf/index.rb', line 594 def make_scan_worker(jobs, completed) Thread.new do with_profiling do db.cursor_process do |cur| while true req = jobs.poll break unless req begin result = yield(cur, req) completed.put(result) rescue Exception => e completed.put(e) LOG.error "Worker failing: #{e.class}: #{e}" LOG.error e raise e end end end end end end |
#overlaps?(gi, i_start, i_end) ⇒ Boolean
655 656 657 658 659 660 |
# File 'lib/bio/maf/index.rb', line 655 def overlaps?(gi, i_start, i_end) g_start = gi.begin (i_start <= g_start && g_start < i_end) \ || gi.include?(i_start) end |
#reopen ⇒ Object
Reopen the same DB handle read-only. Only useful for unit tests.
448 449 450 |
# File 'lib/bio/maf/index.rb', line 448 def reopen KyotoIndex.new(@path, @db) end |
#scan_bin(cur, chrom_id, bin, bin_intervals, filters) ⇒ Object
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 |
# File 'lib/bio/maf/index.rb', line 616 def scan_bin(cur, chrom_id, bin, bin_intervals, filters) # bin_intervals is sorted by zero_start # compute the start and end of all intervals of interest spanning_start = bin_intervals.first.begin spanning_end = bin_intervals.map {|i| i.end}.max # scan from the start of the bin cur.jump(bin_start_prefix(chrom_id, bin)) matches = [] while pair = cur.get(true) c_chr, c_bin, c_start, c_end = pair[0].unpack(KEY_SCAN_FMT) if (c_chr != chrom_id) \ || (c_bin != bin) \ || c_start >= spanning_end # we've hit the next bin, or chromosome, or gone past # the spanning interval, so we're done with this bin break end if c_end >= spanning_start # possible overlap # any intervals that end before the start of the current # block are no longer relevant while bin_intervals.first.end < c_start bin_intervals.shift end bin_intervals.each do |i| i_start = i.begin break if i_start > c_end if ((c_start <= i_start && i_start < c_end) \ || i.include?(c_start)) \ && filters.match(pair) # match matches << extract_index_offset(pair) break end end end end matches end |
#scan_bins(chrom_id, bin_intervals, filters) ⇒ Object
Scan the index for blocks matching the given bins and intervals.
536 537 538 539 540 541 542 543 544 545 |
# File 'lib/bio/maf/index.rb', line 536 def scan_bins(chrom_id, bin_intervals, filters) to_fetch = [] db.cursor_process do |cur| bin_intervals.each do |bin, bin_intervals_raw| matches = scan_bin(cur, chrom_id, bin, bin_intervals_raw, filters) to_fetch.concat(matches) end end to_fetch end |
#scan_bins_parallel(chrom_id, bin_intervals, filters) ⇒ Object
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 |
# File 'lib/bio/maf/index.rb', line 561 def scan_bins_parallel(chrom_id, bin_intervals, filters) start = Time.now n_threads = ENV['profile'] ? 1 : java.lang.Runtime.runtime.availableProcessors jobs = java.util.concurrent.ConcurrentLinkedQueue.new(bin_intervals.to_a) completed = java.util.concurrent.LinkedBlockingQueue.new(128) threads = [] n_threads.times do threads << make_scan_worker(jobs, completed) do |cur, req| bin, intervals = req scan_bin(cur, chrom_id, bin, intervals, filters) end end n_completed = 0 to_fetch = [] while (n_completed < bin_intervals.size) c = completed.poll(5, java.util.concurrent.TimeUnit::SECONDS) if c.nil? if threads.find { |t| t.alive? } next else raise "No threads alive, completed #{n_completed}/#{bin_intervals.size} jobs!" end end raise "worker failed: #{c}" if c.is_a? Exception to_fetch.concat(c) n_completed += 1 end threads.each { |t| t.join } LOG.debug { sprintf("Matched %d index records with %d threads in %.3f seconds.\n", to_fetch.size, n_threads, Time.now - start) } to_fetch end |
#seq_id_for(name) ⇒ Object
695 696 697 698 699 700 701 702 703 704 |
# File 'lib/bio/maf/index.rb', line 695 def seq_id_for(name) sid = index_sequences[name] if ! sid @max_sid += 1 sid = @max_sid db.set("sequence:#{name}", sid.to_s) index_sequences[name] = sid end return sid end |
#slice(interval, parser, filter = {}) ⇒ Object
413 414 415 416 417 418 419 420 421 |
# File 'lib/bio/maf/index.rb', line 413 def slice(interval, parser, filter={}) if block_given? find([interval], parser, filter) do |block| yield block.slice(interval) end else enum_for(:slice, interval, parser, filter) end end |
#species_id_for_seq(seq) ⇒ Object
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 |
# File 'lib/bio/maf/index.rb', line 715 def species_id_for_seq(seq) # NB can have multiple dots # example: otoGar1.scaffold_104707.1-93001 parts = seq.split('.', 2) if parts.size == 2 species_name = parts[0] if species.has_key? species_name return species[species_name] else species_id = @species_max_id + 1 if species_id >= MAX_SPECIES raise "cannot index MAF file with more than #{MAX_SPECIES} species" end species[species_name] = species_id db["species:#{species_name}"] = species_id @species_max_id = species_id return species_id end else # not in species.sequence format, apparently return nil end end |
#with_profiling ⇒ Object
547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/bio/maf/index.rb', line 547 def with_profiling if RUBY_PLATFORM == 'java' && ENV['profile'] rv = nil pdata = JRuby::Profiler.profile do rv = yield end printer = JRuby::Profiler::FlatProfilePrinter.new(pdata) printer.printProfile(STDERR) return rv else yield end end |