Class: Fairy::PGroupBy::ExtMergeSortBuffer

Inherits:
MergeSortBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Attribute Summary

Attributes inherited from OnMemoryBuffer

#log_id

Instance Method Summary collapse

Methods inherited from MergeSortBuffer

#store_2ndmemory

Methods inherited from CommandMergeSortBuffer

#each, #init_2ndmemory, #initialize, #open_buffer, #push, #store_2ndmemory

Methods inherited from OnMemoryBuffer

#each, #initialize, #push

Constructor Details

This class inherits a constructor from Fairy::PGroupBy::CommandMergeSortBuffer

Instance Method Details

#each_2ndmemory(&block) ⇒ Object



519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
# File 'lib/fairy/node/p-group-by.rb', line 519

def each_2ndmemory(&block)
	require "deep-connect/deep-fork"
	
	unless @key_values.empty?
	  store_2ndmemory(@key_values)
	end

	Log::debug(self, @buffers.collect{|b| b.path}.join(" "))

	df = DeepConnect::DeepFork.fork(@njob.processor.deepconnect){|dc, ds|
	  $0 = "fairy processor sorter"

	  dc.export("Sorter", self)

	  finish_wait
#	  ds.close
#	  dc.stop
	  sleep 1
	}
	sorter = df.peer_deep_space.import("Sorter", true)
	sorter.sub_each {|key, values|
#	sorter.sub_each {|bigstr|
# 	  values = bigstr.split("\t").collect{|e| 
# 	    e.gsub(/(\\t|\\\\)/){|v| v == "\\t" ? "\t" : "\\"}
# 	  }
# 	  key = values.shift
	  block.call values
	  nil  # referenceが戻らないようにしている
	}
	sorter.finish
#	df.peer_deep_space.close
	@buffers.each{|buf| buf.close!}
	Process.waitpid(df.peer_pid)
end

#finishObject



598
599
600
# File 'lib/fairy/node/p-group-by.rb', line 598

def finish
	@cv.signal
end

#finish_waitObject

DeepConnect.def_method_spec(self, “REF sub_each()DVAL”)



590
591
592
593
594
595
596
# File 'lib/fairy/node/p-group-by.rb', line 590

def finish_wait
	@mx = Mutex.new
	@cv = ConditionVariable.new
	@mx.synchronize do
	  @cv.wait(@mx)
	end
end

#sub_each(&block) ⇒ Object



554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
# File 'lib/fairy/node/p-group-by.rb', line 554

def sub_each(&block)
	bufs = @buffers.collect{|buf|
	  buf.open
	  kv = read_line(buf.io)
	  [kv, buf]
	}.select{|kv, buf| !kv.nil?}.sort_by{|kv, buf| kv[0]}
	
	key = nil
	values = []
	while buf_min = bufs.shift
	  kv, buf = buf_min

	  if key == kv[0]
	    values.concat kv[1]
	  else
	    yield key, values unless values.empty?
	    key = kv[0]
	    values = kv[1]
	  end

	  next unless line = read_line(buf.io)
	  idx = bufs.rindex{|kv, b| kv[0] <= line[0]}
	  idx ? bufs.insert(idx+1, [line, buf]) : bufs.unshift([line, buf])
	end
	unless values.empty?
	  yield values
# 	  values.unshift key
# 	  bigstr = values.collect{|e| 
# 	    e.gsub(/[\\\t]/){|v| v == "\t" ? "\\t" : '\\\\'}
# 	  }.join("\t")
# 	  yield bigstr
	end
	nil  # referenceが戻らないようにしている
end