Class: Wukong::Script
- Includes:
- HadoopCommand, LocalCommand
- Defined in:
- lib/wukong/script.rb
Overview
How to run a Wukong script
your/script.rb --run path/to/input_files path/to/output_dir
All of the file paths are HDFS paths ; your script path, of course, is on the local filesystem.
Command-line options
If you’d like to listen for any command-line options, specify them at the command line:
your/script.rb --my_bool_opt --my_val_taking_opt=val \
--run path/to/input_files path/to/output_dir
In this case the options hash for both Mapper and Reducer will contain
:my_bool_opt => true,
:my_val_taking_opt => 'val'
Complicated input paths
To use more than one file as input, you can use normal * ? [] wildcards or give a comma-separated list – see the hadoop documentation for syntax.
Run in Elastic MapReduce Mode (–run=emr)
Wukong can be used to start scripts on the amazon cloud
-
copies the script to s3 in two parts
-
invokes it using the amazon API
Run locally (–run=local)
To run your script locally, use –run=local
your/script.rb --run=local path/to/input_files path/to/output_dir
This will pipe the contents of path/to/input_files through first your mapper, then sort, then the reducer, storing the results in the given output directory.
All paths refer to the /local/ filesystem – hadoop is never involved and in fact doesn’t even have to be installed.
How to test your scripts
You can supply the –map argument in place of –run to run the mapper on its own (and similarly, –reduce to run the reducer standalone):
cat ./local/test/input.tsv | ./examples/word_count.rb --map | more
or, if your test data lies on the HDFS,
hdp-cat test/input.tsv | ./examples/word_count.rb --map | more
Direct Known Subclasses
Instance Attribute Summary collapse
-
#input_paths ⇒ Object
readonly
Returns the value of attribute input_paths.
-
#mapper ⇒ Object
readonly
Returns the value of attribute mapper.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#output_path ⇒ Object
readonly
Returns the value of attribute output_path.
-
#reducer ⇒ Object
readonly
Returns the value of attribute reducer.
Instance Method Summary collapse
-
#initialize(mapper, reducer = nil, extra_options = {}) ⇒ Script
constructor
Instantiate the Script with the Mapper and the Reducer class (each a Wukong::Streamer) it should call back.
- #job_name ⇒ Object
-
#mapper_commandline(run_option = :local) ⇒ Object
Shell command for map phase.
-
#reducer_commandline(run_option = :local) ⇒ Object
Shell command for reduce phase.
-
#run ⇒ Object
In –run mode, use the framework (local, hadoop, emr, etc) to re-launch the script as mapper, reducer, etc.
-
#run_mode ⇒ Object
if only –run is given, assume default run mode.
-
#safely(action, &block) ⇒ Object
Wrapper for dangerous operations to catch errors.
Methods included from LocalCommand
#execute_local_workflow, #local_commandline, #local_mode_sort_commandline
Methods included from HadoopCommand
#execute_hadoop_workflow, #hadoop_jobconf_options, #hadoop_other_args, #hadoop_recycle_env, #hadoop_runner, included, #jobconf
Constructor Details
#initialize(mapper, reducer = nil, extra_options = {}) ⇒ Script
Instantiate the Script with the Mapper and the Reducer class (each a Wukong::Streamer) it should call back.
Identity or External program as map or reduce
To use the identity reducer (‘cat’), instantiate your Script class with nil as the reducer class. (And similarly to use an identity mapper, supply nil for the mapper class.)
To use an external program as your reducer (mapper), subclass the reduce_command (map_command) method to return the full command line expression to call.
class MyMapper < Wukong::Streamer::Base
# ... awesome stuff ...
end
class MyScript < Wukong::Script
# prefix each unique line with the count of its occurrences.
def reduce_command
'/usr/bin/uniq -c'
end
end
MyScript.new(MyMapper, nil).run
128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/wukong/script.rb', line 128 def initialize mapper, reducer=nil, ={} Settings.resolve! @options = Settings .merge! @mapper = (case mapper when Class then mapper.new when nil then nil else mapper ; end) @reducer = (case reducer when Class then reducer.new when nil then nil else reducer ; end) @output_path = .rest.pop @input_paths = .rest.reject(&:blank?) if (input_paths.blank? || output_path.blank?) && (not [:dry_run]) && (not ['map', 'reduce'].include?(run_mode)) raise "You need to specify a parsed input directory and a directory for output. Got #{ARGV.inspect}" end end |
Instance Attribute Details
#input_paths ⇒ Object (readonly)
Returns the value of attribute input_paths.
70 71 72 |
# File 'lib/wukong/script.rb', line 70 def input_paths @input_paths end |
#mapper ⇒ Object (readonly)
Returns the value of attribute mapper.
69 70 71 |
# File 'lib/wukong/script.rb', line 69 def mapper @mapper end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
69 70 71 |
# File 'lib/wukong/script.rb', line 69 def @options end |
#output_path ⇒ Object (readonly)
Returns the value of attribute output_path.
70 71 72 |
# File 'lib/wukong/script.rb', line 70 def output_path @output_path end |
#reducer ⇒ Object (readonly)
Returns the value of attribute reducer.
69 70 71 |
# File 'lib/wukong/script.rb', line 69 def reducer @reducer end |
Instance Method Details
#job_name ⇒ Object
208 209 210 211 |
# File 'lib/wukong/script.rb', line 208 def job_name [:job_name] || "#{File.basename(this_script_filename)}---#{input_paths}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '') end |
#mapper_commandline(run_option = :local) ⇒ Object
Shell command for map phase. By default, calls the script in –map mode In hadoop mode, this is given to the hadoop streaming command. In local mode, it’s given to the system() call
177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/wukong/script.rb', line 177 def mapper_commandline(run_option=:local) if mapper case run_option when :local then "#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params when :hadoop then "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --map " + non_wukong_params end else [:map_command] end end |
#reducer_commandline(run_option = :local) ⇒ Object
Shell command for reduce phase. By default, calls the script in –reduce mode In hadoop mode, this is given to the hadoop streaming command. In local mode, it’s given to the system() call
195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/wukong/script.rb', line 195 def reducer_commandline(run_option=:local) if reducer case run_option when :local then "#{ruby_interpreter_path} #{this_script_filename} --reduce " + non_wukong_params when :hadoop then "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --reduce " + non_wukong_params end else [:reduce_command] end end |
#run ⇒ Object
In –run mode, use the framework (local, hadoop, emr, etc) to re-launch
the script as mapper, reducer, etc.
If –map or –reduce, dispatch to the mapper or reducer.
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/wukong/script.rb', line 146 def run case run_mode when 'map' then mapper.stream when 'reduce' then reducer.stream when 'local' then execute_local_workflow when 'cassandra' then execute_hadoop_workflow when 'hadoop', 'mapred' then execute_hadoop_workflow when 'emr' require 'wukong/script/emr_command' execute_emr_workflow else dump_help end end |
#run_mode ⇒ Object
if only –run is given, assume default run mode
161 162 163 164 165 166 167 168 169 170 |
# File 'lib/wukong/script.rb', line 161 def run_mode case when [:map] then 'map' when [:reduce] then 'reduce' when ($0 =~ /-mapper\.rb$/) then 'map' when ($0 =~ /-reducer\.rb$/) then 'reduce' when ([:run] == true) then [:default_run_mode] else [:run].to_s end end |
#safely(action, &block) ⇒ Object
Wrapper for dangerous operations to catch errors
214 215 216 217 218 |
# File 'lib/wukong/script.rb', line 214 def safely action, &block begin block.call rescue StandardError => e ; handle_error(action, e); end end |