Class: Fairy::PGroupBy::ExtMergeSortBuffer

Inherits:
MergeSortBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Attribute Summary

Attributes inherited from OnMemoryBuffer

#log_id

Instance Method Summary collapse

Methods inherited from MergeSortBuffer

#store_2ndmemory

Methods inherited from CommandMergeSortBuffer

#each, #init_2ndmemory, #initialize, #open_buffer, #push, #store_2ndmemory

Methods inherited from OnMemoryBuffer

#each, #initialize, #push

Constructor Details

This class inherits a constructor from Fairy::PGroupBy::CommandMergeSortBuffer

Instance Method Details

#each_2ndmemory(&block) ⇒ Object



525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
# File 'lib/fairy/node/p-group-by.rb', line 525

def each_2ndmemory(&block)
  require "deep-connect/deep-fork"
  
  unless @key_values.empty?
    store_2ndmemory(@key_values)
  end

  Log::debug(self, @buffers.collect{|b| b.path}.join(" "))

  df = DeepConnect::DeepFork.fork(@njob.processor.deepconnect){|dc, ds|
    $0 = "fairy processor sorter"

    dc.export("Sorter", self)

    finish_wait
#   ds.close
#   dc.stop
    sleep 1
  }
  sorter = df.peer_deep_space.import("Sorter", true)
  sorter.sub_each {|key, values|
# sorter.sub_each {|bigstr|
#     values = bigstr.split("\t").collect{|e| 
#       e.gsub(/(\\t|\\\\)/){|v| v == "\\t" ? "\t" : "\\"}
#     }
#     key = values.shift
    block.call values
    nil  # referenceが戻らないようにしている
  }
  sorter.finish
# df.peer_deep_space.close
  @buffers.each{|buf| buf.close!}
  Process.waitpid(df.peer_pid)
end

#finishObject



604
605
606
# File 'lib/fairy/node/p-group-by.rb', line 604

def finish
  @cv.signal
end

#finish_waitObject

DeepConnect.def_method_spec(self, “REF sub_each()DVAL”)



596
597
598
599
600
601
602
# File 'lib/fairy/node/p-group-by.rb', line 596

def finish_wait
  @mx = Mutex.new
  @cv = XThread::ConditionVariable.new
  @mx.synchronize do
    @cv.wait(@mx)
  end
end

#sub_each(&block) ⇒ Object



560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# File 'lib/fairy/node/p-group-by.rb', line 560

def sub_each(&block)
  bufs = @buffers.collect{|buf|
    buf.open
    kv = read_line(buf.io)
    [kv, buf]
  }.select{|kv, buf| !kv.nil?}.sort_by{|kv, buf| kv[0]}
  
  key = nil
  values = []
  while buf_min = bufs.shift
    kv, buf = buf_min

    if key == kv[0]
      values.concat kv[1]
    else
      yield key, values unless values.empty?
      key = kv[0]
      values = kv[1]
    end

    next unless line = read_line(buf.io)
    idx = bufs.rindex{|kv, b| kv[0] <= line[0]}
    idx ? bufs.insert(idx+1, [line, buf]) : bufs.unshift([line, buf])
  end
  unless values.empty?
    yield values
#     values.unshift key
#     bigstr = values.collect{|e| 
#       e.gsub(/[\\\t]/){|v| v == "\t" ? "\\t" : '\\\\'}
#     }.join("\t")
#     yield bigstr
  end
  nil  # referenceが戻らないようにしている
end