Class: Pmux::StreamingReducer

Inherits:
Reducer
  • Object
show all
Includes:
FixCmdLine
Defined in:
lib/pmux/reducer.rb

Instance Attribute Summary

Attributes inherited from Reducer

#exitstatus, #output_path, #tmp_dir

Instance Method Summary collapse

Methods included from FixCmdLine

#fix_cmd_line

Methods inherited from Reducer

#initialize

Constructor Details

This class inherits a constructor from Pmux::Reducer

Instance Method Details

#do_reduce_taskObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pmux/reducer.rb', line 34

def do_reduce_task
  reducer_cmd = @task['reducer'] || 'cat'
  @output_path = "#{@tmp_dir}/r#{@task['pindex']}"
  err_path = "#{@tmp_dir}/.err.#{$$}"
  err_msg = nil
  #cmd_line = fix_cmd_line reducer_cmd,
  #  @paths.join(' '), @output_path, err_path, tmp_dir
  cmd_line = fix_cmd_line reducer_cmd,
    "#{tmp_dir}/t*-#{@task['pindex']}", @output_path, err_path, tmp_dir
  Log.debug "system: #{cmd_line}"
  system cmd_line
  @exitstatus = $?.exitstatus
  if File.size? err_path
    err_msg = File.read(err_path).chomp!
    raise RuntimeError, err_msg
  end
  if @exitstatus > 1
    raise RuntimeError, "failed to execute reducer: #{cmd_line}"
  end
  @output_path
end

#do_streaming_reduce_taskObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/pmux/reducer.rb', line 56

def do_streaming_reduce_task
  reducer_cmd = @task['reducer'] || 'cat'
  @output_path = "#{@tmp_dir}/r#{@task['pindex']}"
  err_path = "#{@tmp_dir}/.rerr.#{$$}"
  err_msg = nil
  #cmd_line = fix_cmd_line reducer_cmd,
  #  @paths.join(' '), nil, err_path, tmp_dir
  cmd_line = fix_cmd_line reducer_cmd,
    "#{tmp_dir}/t*-#{@task['pindex']}", nil, err_path, tmp_dir
  Log.debug "popen: #{cmd_line}"
  pipeio = nil
  Dir.chdir(@tmp_dir) {pipeio = PipeIO.new cmd_line}
  if @on_receive
    pipeio.on_receive &@on_receive
  else
    out = open(@output_path, 'a')
    pipeio.on_receive {|data|
      out.write data
    }
  end
  on_success = @on_success
  on_error = @on_error
  pipeio.on_close {
    if out
      out.close rescue nil
    end
    if File.size? err_path
      err_msg = File.read(err_path).chomp!
      #raise RuntimeError, err_msg
      e = RuntimeError.new err_msg
      e.set_backtrace ['reducer']
      on_error.call e if on_error
    else
      on_success.call self if on_success
    end
  }
  @loop.attach pipeio
end

#on_error(&block) ⇒ Object



30
31
32
# File 'lib/pmux/reducer.rb', line 30

def on_error &block
  @on_error = block
end

#on_receive(&block) ⇒ Object



24
25
26
# File 'lib/pmux/reducer.rb', line 24

def on_receive &block
  @on_receive = block
end

#on_success(&block) ⇒ Object



27
28
29
# File 'lib/pmux/reducer.rb', line 27

def on_success &block
  @on_success = block
end