Class: Hjc::HadoopStreaming

Inherits:
Object
  • Object
show all
Defined in:
lib/hjc/hadoop_streaming.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHadoopStreaming

Returns a new instance of HadoopStreaming.



6
7
8
9
10
11
# File 'lib/hjc/hadoop_streaming.rb', line 6

def initialize
  @files = {}
  @jobconf = {}
  @local = false
  @debug = false
end

Instance Attribute Details

#debugObject

Returns the value of attribute debug.



4
5
6
# File 'lib/hjc/hadoop_streaming.rb', line 4

def debug
  @debug
end

#input_pathObject

Returns the value of attribute input_path.



3
4
5
# File 'lib/hjc/hadoop_streaming.rb', line 3

def input_path
  @input_path
end

#jobconfObject

Returns the value of attribute jobconf.



3
4
5
# File 'lib/hjc/hadoop_streaming.rb', line 3

def jobconf
  @jobconf
end

#languageObject

Returns the value of attribute language.



3
4
5
# File 'lib/hjc/hadoop_streaming.rb', line 3

def language
  @language
end

#localObject

Returns the value of attribute local.



4
5
6
# File 'lib/hjc/hadoop_streaming.rb', line 4

def local
  @local
end

#mapper_pathObject

Returns the value of attribute mapper_path.



3
4
5
# File 'lib/hjc/hadoop_streaming.rb', line 3

def mapper_path
  @mapper_path
end

#output_pathObject

Returns the value of attribute output_path.



3
4
5
# File 'lib/hjc/hadoop_streaming.rb', line 3

def output_path
  @output_path
end

#reducer_pathObject

Returns the value of attribute reducer_path.



3
4
5
# File 'lib/hjc/hadoop_streaming.rb', line 3

def reducer_path
  @reducer_path
end

Instance Method Details

#argsObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/hjc/hadoop_streaming.rb', line 44

def args
  concated_args = []
  concated_args.concat ['-input', @input_path] if @input_path
  concated_args.concat ['-output' ,@output_path] if @output_path
  concated_args.concat ['-mapper', @mapper_path] if @mapper_path
  concated_args.concat ['-reducer', @reducer_path] if @reducer_path
  concated_args.concat ['-dfs', 'file:///'] if @local
  concated_args.concat ['-jt', 'local'] if @local # no use?
  concated_args.concat ['-debug'] if @debug

  @jobconf.each do |k, v|
    concated_args += ['-jobconf', "#{k}=#{v}"]
  end

  @files.each do |k, v|
    concated_args.concat ["-file", v.path]
  end

  puts "args: #{concated_args.join(' ')}" if @debug
  concated_args
end

#input=(input) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/hjc/hadoop_streaming.rb', line 22

def input=(input)
  # input param seems to explain exact path on Hadoop streaming..
  file = Util.to_temp_file('input', input)
  @input_path = file.path
  unless @local # path seems on HDFS
    sh = FsShell.new
    sh.put(file.path, file.path)
  end
end

#mapper=(mapper) ⇒ Object



32
33
34
35
36
# File 'lib/hjc/hadoop_streaming.rb', line 32

def mapper=(mapper)
  mapper = with_shebang(mapper, @language)
  @files['mapper'] = file = Util.to_temp_file('mapper', mapper, :mod => 0700)
  @mapper_path = File.basename(file.path)
end

#reducer=(reducer) ⇒ Object



38
39
40
41
42
# File 'lib/hjc/hadoop_streaming.rb', line 38

def reducer=(reducer)
  reducer = with_shebang(reducer, @language)
  @files['reducer'] = file = Util.to_temp_file('reducer', reducer, :mod => 0700)
  @reducer_path = File.basename(file.path)
end

#runObject



13
14
15
16
# File 'lib/hjc/hadoop_streaming.rb', line 13

def run
  java_job = StreamJob.new
  @ret = java_job.run(args)
end

#success?Boolean

Returns:

  • (Boolean)


18
19
20
# File 'lib/hjc/hadoop_streaming.rb', line 18

def success?
  @ret == 0 # success if job returned 0
end