Module: Wukong::Hadoop::HadoopInvocation

Included in:
HadoopRunner
Defined in:
lib/wukong-hadoop/runner/hadoop_invocation.rb

Overview

Provides methods for executing a map/reduce job on a Hadoop cluster via streaming[http://hadoop.apache.org/docs/r0.15.2/streaming.html].

Instance Method Summary collapse

Instance Method Details

#hadoop_commandlineString

Return the Hadoop command used to launch this job in a Hadoop cluster.

You should be able to copy, paste, and run this command unmodified when debugging.

Returns:

  • (String)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 24

def hadoop_commandline
  [
   hadoop_runner,
   "jar #{hadoop_streaming_jar}",
   hadoop_jobconf_options,
   "-D mapred.job.name='#{job_name}'",
   hadoop_files,
   hadoop_other_args,
   "-mapper       '#{mapper_commandline}'",
   "-reducer      '#{reducer_commandline}'",
   "-input        '#{input_paths}'",
   "-output       '#{output_path}'",
   io_formats,
   hadoop_recycle_env,
  ].flatten.compact.join(" \t\\\n  ")
end

#hadoop_filesObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 147

def hadoop_files
  args.find_all { |arg| arg.to_s =~ /\.rb$/ }.each do |arg|
    settings[:files] << arg
  end

  [].tap do |files_options|
    {
      :files    => '-files        ',
      :jars     => '-libjars      ',
      :archives => '-archives     '
    }.each_pair do |file_type_name, file_option_name|
      unless settings[file_type_name].nil? || settings[file_type_name].empty?
        files = settings[file_type_name].map do |file_name_or_glob|
          # Don't glob on the HDFS
          file_type_name == :archives ? file_name_or_glob : [Dir[file_name_or_glob], file_name_or_glob]
        end.flatten.compact.uniq.join(',')
        files_options << "#{file_option_name}'#{files}'"
      end
    end
  end
end

#hadoop_jobconf_optionsArray<String>

Return an array of jobconf (-D) options that will be passed to Hadoop.

Translates the "friendly" wu-hadoop names into the less-friendly Hadoop names.

Returns:

  • (Array<String>)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 104

def hadoop_jobconf_options
  jobconf_options = []
  settings[:reuse_jvms]          = '-1'    if     (settings[:reuse_jvms] == true)
  settings[:respect_exit_status] = 'false' if     (settings[:ignore_exit_status] == true)
  # If no reducer and no reduce_command, then skip the reduce phase
  settings[:reduce_tasks]      ||= 0       unless reduce?
  jobconf_options += [
                      :io_sort_mb,               :io_sort_record_percent,
                      :map_speculative,          :map_tasks,
                      :max_maps_per_cluster,     :max_maps_per_node,
                      :max_node_map_tasks,       :max_node_reduce_tasks,
                      :max_reduces_per_cluster,  :max_reduces_per_node,
                      :max_record_length,        :min_split_size,
                      :output_field_separator,   :key_field_separator,
                      :partition_fields,         :sort_fields,
                      :reduce_tasks,             :respect_exit_status,
                      :reuse_jvms,               :timeout,
                      :max_tracker_failures,     :max_map_attempts,
                      :max_reduce_attempts,      :reduce_speculative
                     ].map do |opt|
    defn = settings.definition_of(opt, :description)
    val  = settings[opt]
    java_opt(defn, val)
  end
  jobconf_options.flatten.compact
end

#hadoop_other_argsString

Returns other arguments used by Hadoop streaming.

Returns:

  • (String)


134
135
136
137
138
139
140
141
142
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 134

def hadoop_other_args
  extra_str_args  = parsed_java_opts
  if settings[:split_on_xml_tag]
    extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{options.split_on_xml_tag}>,end=</#{options.split_on_xml_tag}>'}
  end
  extra_str_args   << ' -lazyOutput' if settings[:noempty]  # don't create reduce file if no records
  extra_str_args   << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless settings[:partition_fields].blank?
  extra_str_args
end

#hadoop_recycle_envObject

:nodoc:



180
181
182
183
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 180

def hadoop_recycle_env
  use_alternative_gemfile if settings[:gemfile]
  %w[BUNDLE_GEMFILE LANG].map{ |var| %Q{-cmdenv       '#{var}=#{ENV[var]}'} if ENV[var] }.compact
end

#hadoop_runnerString

The name of the Hadoop binary to use.

Respects the value of --hadoop_runner if given.

Returns:

  • (String)


84
85
86
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 84

def hadoop_runner
  settings[:hadoop_runner] || 'hadoop'
end

#hadoop_streaming_jarString

The path (glob) to the Hadoop streaming jar.

Respects the value of --hadoop_streaming_jar if given. Otherwise uses the default CDH4 location.

Returns:

  • (String)


94
95
96
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 94

def hadoop_streaming_jar
  settings[:hadoop_streaming_jar] || '/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh*.jar'
end

#input_formatString

The input format to use.

Respects the value of --input_format.

Returns:

  • (String)


59
60
61
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 59

def input_format
  settings[:input_format]
end

#io_formatsObject

:nodoc:



73
74
75
76
77
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 73

def io_formats
  input  = "-inputformat  '#{input_format}'"  if input_format
  output = "-outputformat '#{output_format}'" if output_format
  [input, output]
end

#java_opt(option, value) ⇒ Object

:nodoc:



193
194
195
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 193

def java_opt option, value
  "-D %s=%s" % [option, Shellwords.escape(value.to_s)] if value
end

#job_nameString

The job name that will be passed to Hadoop.

Respects the --job_name option if given, otherwise constructs one from the given processors, input, and output paths.

Returns:

  • (String)


48
49
50
51
52
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 48

def job_name
  return settings[:job_name] if settings[:job_name]
  relevant_filename = args.compact.uniq.map { |path| File.basename(path, '.rb') }.join('-')
  "#{relevant_filename}---#{input_paths}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '')
end

#output_formatString

The output format to use.

Respects the value of --output_format.

Returns:

  • (String)


68
69
70
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 68

def output_format
  settings[:output_format]
end

#parsed_java_optsObject

:nodoc:



186
187
188
189
190
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 186

def parsed_java_opts
  settings[:java_opts].map do |java_opt|
    java_opt.split('-D').reject{ |opt| opt.blank? }.map{ |opt| '-D ' + opt.strip }
  end.flatten
end

#remove_output_pathObject

Remove the output path.

Will not actually do anything if the --dry_run option is also given.



13
14
15
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 13

def remove_output_path
  execute_command("#{hadoop_runner} fs -rmr '#{output_path}'")
end

#ruby_interpreter_pathObject

:nodoc:



170
171
172
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 170

def ruby_interpreter_path
  Pathname.new(File.join(Config::CONFIG['bindir'], Config::CONFIG['RUBY_INSTALL_NAME'] + Config::CONFIG['EXEEXT'])).realpath
end

#use_alternative_gemfileObject

:nodoc:



175
176
177
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 175

def use_alternative_gemfile
  ENV['BUNDLE_GEMFILE'] = settings[:gemfile]
end