Method: Pmux::StreamingMapper#do_streaming_map_task

Defined in:
lib/pmux/mapper.rb

#do_streaming_map_taskObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/pmux/mapper.rb', line 74

def do_streaming_map_task
  mapper_cmd = @task['mapper'] || 'cat'
  err_path = "#{tmp_dir}/.err.#{object_id}"
  err_msg = nil
  pipeio = nil
  if @num_r <= 1
    cmd_line = fix_cmd_line mapper_cmd,
      @path, nil, err_path, tmp_dir
    Log.debug "pipe: #{cmd_line}"
    Dir.chdir(tmp_dir) {pipeio = PipeIO.new cmd_line}
    out = open("#{@ifbase}-0", 'a')
    pipeio.on_receive {|data| out.write data}
  else # @num_r >= 2
    partitioner = TextPartitioner.new @ifbase, @num_r,
      :separator=>@task['separator']
    cmd_line = fix_cmd_line mapper_cmd, @path, nil, err_path, tmp_dir
    Dir.chdir(tmp_dir) {pipeio = PipeIO.new cmd_line}
    pipeio.on_receive {|data| partitioner.emit data}
  end
  on_success = @on_success
  on_error = @on_error
  pipeio.on_close {
    if out
      out.close rescue nil
    end
    if partitioner
      partitioner.close
    end
    #@exitstatus = $?.exitstatus
    if File.size? err_path
      err_msg = File.read(err_path).chomp!
      e = RuntimeError.new err_msg
      e.set_backtrace ['mapper']
      on_error.call e if on_error
    else
      on_success.call if on_success
    end
  }
  @loop.attach pipeio
end