74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/pmux/mapper.rb', line 74
def do_streaming_map_task
mapper_cmd = @task['mapper'] || 'cat'
err_path = "#{tmp_dir}/.err.#{object_id}"
err_msg = nil
pipeio = nil
if @num_r <= 1
cmd_line = fix_cmd_line mapper_cmd,
@path, nil, err_path, tmp_dir
Log.debug "pipe: #{cmd_line}"
Dir.chdir(tmp_dir) {pipeio = PipeIO.new cmd_line}
out = open("#{@ifbase}-0", 'a')
pipeio.on_receive {|data| out.write data}
else
partitioner = TextPartitioner.new @ifbase, @num_r,
:separator=>@task['separator']
cmd_line = fix_cmd_line mapper_cmd, @path, nil, err_path, tmp_dir
Dir.chdir(tmp_dir) {pipeio = PipeIO.new cmd_line}
pipeio.on_receive {|data| partitioner.emit data}
end
on_success = @on_success
on_error = @on_error
pipeio.on_close {
if out
out.close rescue nil
end
if partitioner
partitioner.close
end
if File.size? err_path
err_msg = File.read(err_path).chomp!
e = RuntimeError.new err_msg
e.set_backtrace ['mapper']
on_error.call e if on_error
else
on_success.call if on_success
end
}
@loop.attach pipeio
end
|