Module: Wukong::Hadoop::HadoopInvocation
- Included in:
- HadoopRunner
- Defined in:
- lib/wukong-hadoop/runner/hadoop_invocation.rb
Overview
Provides methods for executing a map/reduce job on a Hadoop cluster via streaming[http://hadoop.apache.org/docs/r0.15.2/streaming.html].
Instance Method Summary collapse
-
#hadoop_commandline ⇒ String
Return the Hadoop command used to launch this job in a Hadoop cluster.
-
#hadoop_files ⇒ Object
:nodoc:.
-
#hadoop_jobconf_options ⇒ Array<String>
Return an array of jobconf (-D) options that will be passed to Hadoop.
-
#hadoop_other_args ⇒ String
Returns other arguments used by Hadoop streaming.
-
#hadoop_recycle_env ⇒ Object
:nodoc:.
-
#hadoop_runner ⇒ String
The name of the Hadoop binary to use.
-
#hadoop_streaming_jar ⇒ String
The path (glob) to the Hadoop streaming jar.
-
#input_format ⇒ String
The input format to use.
-
#io_formats ⇒ Object
:nodoc:.
-
#java_opt(option, value) ⇒ Object
:nodoc:.
-
#job_name ⇒ String
The job name that will be passed to Hadoop.
-
#output_format ⇒ String
The output format to use.
-
#parsed_java_opts ⇒ Object
:nodoc:.
-
#remove_output_path ⇒ Object
Remove the output path.
-
#ruby_interpreter_path ⇒ Object
:nodoc:.
-
#use_alternative_gemfile ⇒ Object
:nodoc:.
Instance Method Details
#hadoop_commandline ⇒ String
Return the Hadoop command used to launch this job in a Hadoop cluster.
You should be able to copy, paste, and run this command unmodified when debugging.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 24 def hadoop_commandline [ hadoop_runner, "jar #{hadoop_streaming_jar}", , "-D mapred.job.name='#{job_name}'", hadoop_files, hadoop_other_args, "-mapper '#{mapper_commandline}'", "-reducer '#{reducer_commandline}'", "-input '#{input_paths}'", "-output '#{output_path}'", io_formats, hadoop_recycle_env, ].flatten.compact.join(" \t\\\n ") end |
#hadoop_files ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 147 def hadoop_files args.find_all { |arg| arg.to_s =~ /\.rb$/ }.each do |arg| settings[:files] << arg end [].tap do || { :files => '-files ', :jars => '-libjars ', :archives => '-archives ' }.each_pair do |file_type_name, file_option_name| unless settings[file_type_name].nil? || settings[file_type_name].empty? files = settings[file_type_name].map do |file_name_or_glob| # Don't glob on the HDFS file_type_name == :archives ? file_name_or_glob : [Dir[file_name_or_glob], file_name_or_glob] end.flatten.compact.uniq.join(',') << "#{file_option_name}'#{files}'" end end end end |
#hadoop_jobconf_options ⇒ Array<String>
Return an array of jobconf (-D) options that will be passed to Hadoop.
Translates the "friendly" wu-hadoop names into the less-friendly Hadoop names.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 104 def = [] settings[:reuse_jvms] = '-1' if (settings[:reuse_jvms] == true) settings[:respect_exit_status] = 'false' if (settings[:ignore_exit_status] == true) # If no reducer and no reduce_command, then skip the reduce phase settings[:reduce_tasks] ||= 0 unless reduce? += [ :io_sort_mb, :io_sort_record_percent, :map_speculative, :map_tasks, :max_maps_per_cluster, :max_maps_per_node, :max_node_map_tasks, :max_node_reduce_tasks, :max_reduces_per_cluster, :max_reduces_per_node, :max_record_length, :min_split_size, :output_field_separator, :key_field_separator, :partition_fields, :sort_fields, :reduce_tasks, :respect_exit_status, :reuse_jvms, :timeout, :max_tracker_failures, :max_map_attempts, :max_reduce_attempts, :reduce_speculative ].map do |opt| defn = settings.definition_of(opt, :description) val = settings[opt] java_opt(defn, val) end .flatten.compact end |
#hadoop_other_args ⇒ String
Returns other arguments used by Hadoop streaming.
134 135 136 137 138 139 140 141 142 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 134 def hadoop_other_args extra_str_args = parsed_java_opts if settings[:split_on_xml_tag] extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{.split_on_xml_tag}>,end=</#{.split_on_xml_tag}>'} end extra_str_args << ' -lazyOutput' if settings[:noempty] # don't create reduce file if no records extra_str_args << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless settings[:partition_fields].blank? extra_str_args end |
#hadoop_recycle_env ⇒ Object
:nodoc:
180 181 182 183 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 180 def hadoop_recycle_env use_alternative_gemfile if settings[:gemfile] %w[BUNDLE_GEMFILE LANG].map{ |var| %Q{-cmdenv '#{var}=#{ENV[var]}'} if ENV[var] }.compact end |
#hadoop_runner ⇒ String
The name of the Hadoop binary to use.
Respects the value of --hadoop_runner if given.
84 85 86 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 84 def hadoop_runner settings[:hadoop_runner] || 'hadoop' end |
#hadoop_streaming_jar ⇒ String
The path (glob) to the Hadoop streaming jar.
Respects the value of --hadoop_streaming_jar if given. Otherwise uses the default CDH4 location.
94 95 96 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 94 def hadoop_streaming_jar settings[:hadoop_streaming_jar] || '/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh*.jar' end |
#input_format ⇒ String
The input format to use.
Respects the value of --input_format.
59 60 61 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 59 def input_format settings[:input_format] end |
#io_formats ⇒ Object
:nodoc:
73 74 75 76 77 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 73 def io_formats input = "-inputformat '#{input_format}'" if input_format output = "-outputformat '#{output_format}'" if output_format [input, output] end |
#java_opt(option, value) ⇒ Object
:nodoc:
193 194 195 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 193 def java_opt option, value "-D %s=%s" % [option, Shellwords.escape(value.to_s)] if value end |
#job_name ⇒ String
The job name that will be passed to Hadoop.
Respects the --job_name option if given, otherwise constructs one from the given processors, input, and output paths.
48 49 50 51 52 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 48 def job_name return settings[:job_name] if settings[:job_name] relevant_filename = args.compact.uniq.map { |path| File.basename(path, '.rb') }.join('-') "#{relevant_filename}---#{input_paths}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '') end |
#output_format ⇒ String
The output format to use.
Respects the value of --output_format.
68 69 70 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 68 def output_format settings[:output_format] end |
#parsed_java_opts ⇒ Object
:nodoc:
186 187 188 189 190 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 186 def parsed_java_opts settings[:java_opts].map do |java_opt| java_opt.split('-D').reject{ |opt| opt.blank? }.map{ |opt| '-D ' + opt.strip } end.flatten end |
#remove_output_path ⇒ Object
Remove the output path.
Will not actually do anything if the --dry_run option is also given.
13 14 15 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 13 def remove_output_path execute_command("#{hadoop_runner} fs -rmr '#{output_path}'") end |
#ruby_interpreter_path ⇒ Object
:nodoc:
170 171 172 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 170 def ruby_interpreter_path Pathname.new(File.join(Config::CONFIG['bindir'], Config::CONFIG['RUBY_INSTALL_NAME'] + Config::CONFIG['EXEEXT'])).realpath end |
#use_alternative_gemfile ⇒ Object
:nodoc:
175 176 177 |
# File 'lib/wukong-hadoop/runner/hadoop_invocation.rb', line 175 def use_alternative_gemfile ENV['BUNDLE_GEMFILE'] = settings[:gemfile] end |