49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/fairy/node/p-sort.rb', line 49
def start_export
Log::debug(self, "START_EXPORT")
start do
sample_line_no = @opts[:sampling_max]
sample_line_no ||= CONF.SORT_SAMPLING_MAX
hash_opt = @opts[:cmp_optimize]
hash_opt = CONF.SORT_CMP_OPTIMIZE if hash_opt.nil?
if hash_opt
@key_proc = eval("proc{#{@block_source.source}}", @context.binding)
else
@key_proc = BBlock.new(@block_source, @context, self)
end
buf = []
no = 0
begin
if @pvs
sampling = false
Log::debugf(self, "%s", @pvs.inspect)
init_exports
elsif self.no == 0
sampling = true
else
sampling = false
@pvs = @bjob.get_pvs
Log::debugf(self, "%s", @pvs.inspect)
init_exports
end
@input.each do |e|
if sampling
no += 1
buf.push e
if no >= sample_line_no
sampling = false
@pvs = @bjob.get_pvs(buf)
Log::debugf(self, "%s", @pvs.inspect)
init_exports
buf.each{|e| hashing(e)}
end
else
hashing(e)
end
end
if sampling
@pvs = @bjob.get_pvs(buf)
Log::debugf(self, "%s", @pvs.inspect)
init_exports
buf.each{|e| hashing(e)}
end
rescue
Log::debug_exception(self)
raise
ensure
@exports_queue.push nil
@exports.each_pair do |key, export|
next unless export
Log::debug(self, "G0 #{key} => #{@counter[key]}")
export.push END_OF_STREAM
end
end
end
end
|