Method: Fairy::PSort::PPreSort#start_export

Defined in:
lib/fairy/node/p-sort.rb

#start_exportObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fairy/node/p-sort.rb', line 49

def start_export
	Log::debug(self, "START_EXPORT")

	start do
	  sample_line_no = @opts[:sampling_max]
	  sample_line_no ||= CONF.SORT_SAMPLING_MAX

	  hash_opt = @opts[:cmp_optimize]
	  hash_opt = CONF.SORT_CMP_OPTIMIZE if hash_opt.nil?
	  
	  if hash_opt
	    @key_proc = eval("proc{#{@block_source.source}}", @context.binding)
	  else
	    @key_proc = BBlock.new(@block_source, @context, self)
	  end
	  
	  buf = []
	  no = 0
	  begin
	    if @pvs
 sampling = false
Log::debugf(self, "%s", @pvs.inspect)
 init_exports
	    elsif self.no == 0
 sampling = true
	    else
 sampling = false
 @pvs = @bjob.get_pvs
Log::debugf(self, "%s", @pvs.inspect)
 init_exports
	    end
 
	    @input.each do |e|
 if sampling
		no += 1
		buf.push e
		if no >= sample_line_no
		  sampling = false
		  @pvs = @bjob.get_pvs(buf)
Log::debugf(self, "%s", @pvs.inspect)
		  init_exports
		  buf.each{|e| hashing(e)}
		end
 else
		hashing(e)
 end
	    end
	    if sampling
 @pvs = @bjob.get_pvs(buf)
Log::debugf(self, "%s", @pvs.inspect)
 init_exports
 buf.each{|e| hashing(e)}
	    end
	  rescue
	    Log::debug_exception(self)
	    raise
	  ensure
	    @exports_queue.push nil
	    @exports.each_pair do |key, export| 
 next unless export
 Log::debug(self, "G0 #{key} => #{@counter[key]}")	    
 export.push END_OF_STREAM
	    end
	  end
	end
end