Class: Wukong::Hadoop::HadoopRunner
- Inherits:
-
Runner
- Object
- Runner
- Wukong::Hadoop::HadoopRunner
- Includes:
- Logging, HadoopInvocation, LocalInvocation, MapLogic, Overwritables, ReduceLogic
- Defined in:
- lib/wukong-hadoop/runner.rb
Overview
The Hadoop::Runner class contains the logic to examine arguments and construct command lines which it will execute to create the desired behavior.
The Hadoop::Runner will introspect on its arguments to guess (if not given) the processors to use as mapper and reducer in a map/reduce job. It will also decide whether to run that job in local or Hadoop mode. These decisions result in a command which it will ultimately execute.
Instance Method Summary collapse
-
#command_prefix ⇒ String
The prefix to insert befor all invocations of the wu-local runner.
-
#file_is_processor?(path) ⇒ true, false
Does the given
path
contain a processor named after itself?. -
#mode ⇒ :hadoop, :local
What mode is this runner in?.
-
#non_wukong_hadoop_params_string ⇒ String
Returns parameters to pass to an invocation of wu-local.
-
#processor_name_from_file(path) ⇒ String
Return the guessed name of a processor at the given
path
. -
#processor_registered?(name) ⇒ true, false
Is there a processor registered with the given
name
?. -
#run ⇒ Object
Run this command.
-
#separate_map_and_reduce_args? ⇒ true, false
Were mapper and/or reducer named by separate arguments?.
-
#single_job_arg? ⇒ true, false
Were mapper and/or reducer named by a single argument?.
-
#validate ⇒ true
Validate that no more than two arguments were given and that explicit input & output arguments were given if we're in Hadoop mode.
Methods included from LocalInvocation
#cat_input, #cat_output, #local_commandline, #sort_commandline
Methods included from HadoopInvocation
#hadoop_commandline, #hadoop_files, #hadoop_jobconf_options, #hadoop_other_args, #hadoop_recycle_env, #hadoop_runner, #hadoop_streaming_jar, #input_format, #io_formats, #java_opt, #job_name, #output_format, #parsed_java_opts, #remove_output_path, #ruby_interpreter_path, #use_alternative_gemfile
Methods included from ReduceLogic
#explicit_reduce_command?, #explicit_reduce_processor?, #explicit_reducer?, #map_only?, #reduce?, #reducer_arg, #reducer_commandline, #reducer_name, #reducer_needs_run_arg?
Methods included from MapLogic
#explicit_map_command?, #explicit_map_processor?, #explicit_mapper?, #mapper_arg, #mapper_commandline, #mapper_name, #mapper_needs_run_arg?
Methods included from Overwritables
#input_paths, #output_path, #params_to_pass
Instance Method Details
#command_prefix ⇒ String
The prefix to insert befor all invocations of the wu-local runner.
171 172 173 |
# File 'lib/wukong-hadoop/runner.rb', line 171 def command_prefix settings[:command_prefix] end |
#file_is_processor?(path) ⇒ true, false
Does the given path
contain a processor named after itself?
162 163 164 165 |
# File 'lib/wukong-hadoop/runner.rb', line 162 def file_is_processor?(path) return false unless path processor_registered?(processor_name_from_file(path)) end |
#mode ⇒ :hadoop, :local
What mode is this runner in?
124 125 126 |
# File 'lib/wukong-hadoop/runner.rb', line 124 def mode settings[:mode].to_s == 'local' ? :local : :hadoop end |
#non_wukong_hadoop_params_string ⇒ String
Returns parameters to pass to an invocation of wu-local.
Parameters like --reduce_tasks which are relevant to Wukong-Hadoop will be interpreted and not passed. Others will be passed unmodified.
183 184 185 186 187 188 189 |
# File 'lib/wukong-hadoop/runner.rb', line 183 def non_wukong_hadoop_params_string params_to_pass.reject do |param, val| params_to_pass.definition_of(param, :wukong_hadoop) end.map do |param,val| "--#{param}=#{Shellwords.escape(val.to_s)}" end.join(" ") end |
#processor_name_from_file(path) ⇒ String
Return the guessed name of a processor at the given path
.
154 155 156 |
# File 'lib/wukong-hadoop/runner.rb', line 154 def processor_name_from_file(path) File.basename(path, '.rb') end |
#processor_registered?(name) ⇒ true, false
Is there a processor registered with the given name
?
146 147 148 |
# File 'lib/wukong-hadoop/runner.rb', line 146 def processor_registered? name Wukong.registry.registered?(name.to_s.to_sym) end |
#run ⇒ Object
Run this command.
109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/wukong-hadoop/runner.rb', line 109 def run if mode == :local log.info "Launching local!" execute_command!(local_commandline) else remove_output_path if settings[:rm] || settings[:overwrite] hadoop_commandline log.info "Launching Hadoop!" execute_command!(hadoop_commandline) end end |
#separate_map_and_reduce_args? ⇒ true, false
Were mapper and/or reducer named by separate arguments?
138 139 140 |
# File 'lib/wukong-hadoop/runner.rb', line 138 def separate_map_and_reduce_args? args.size == 2 end |
#single_job_arg? ⇒ true, false
Were mapper and/or reducer named by a single argument?
131 132 133 |
# File 'lib/wukong-hadoop/runner.rb', line 131 def single_job_arg? args.size == 1 end |
#validate ⇒ true
Validate that no more than two arguments were given and that explicit input & output arguments were given if we're in Hadoop mode.
100 101 102 103 104 105 106 |
# File 'lib/wukong-hadoop/runner.rb', line 100 def validate raise Error.new("Cannot provide more than two arguments") if args.length > 2 if mode == :hadoop && (input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty?) raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.") end true end |