Class: Wukong::Hadoop::HadoopRunner

Inherits:
Runner
  • Object
show all
Includes:
Logging, HadoopInvocation, LocalInvocation, MapLogic, Overwritables, ReduceLogic
Defined in:
lib/wukong-hadoop/runner.rb

Overview

The Hadoop::Runner class contains the logic to examine arguments and construct command lines which it will execute to create the desired behavior.

The Hadoop::Runner will introspect on its arguments to guess (if not given) the processors to use as mapper and reducer in a map/reduce job. It will also decide whether to run that job in local or Hadoop mode. These decisions result in a command which it will ultimately execute.

Instance Method Summary collapse

Methods included from LocalInvocation

#cat_input, #cat_output, #local_commandline, #sort_commandline

Methods included from HadoopInvocation

#hadoop_commandline, #hadoop_files, #hadoop_jobconf_options, #hadoop_other_args, #hadoop_recycle_env, #hadoop_runner, #hadoop_streaming_jar, #input_format, #io_formats, #java_opt, #job_name, #output_format, #parsed_java_opts, #remove_output_path, #ruby_interpreter_path, #use_alternative_gemfile

Methods included from ReduceLogic

#explicit_reduce_command?, #explicit_reduce_processor?, #explicit_reducer?, #map_only?, #reduce?, #reducer_arg, #reducer_commandline, #reducer_name, #reducer_needs_run_arg?

Methods included from MapLogic

#explicit_map_command?, #explicit_map_processor?, #explicit_mapper?, #mapper_arg, #mapper_commandline, #mapper_name, #mapper_needs_run_arg?

Methods included from Overwritables

#input_paths, #output_path, #params_to_pass

Instance Method Details

#command_prefixString

The prefix to insert befor all invocations of the wu-local runner.

Returns:

  • (String)


171
172
173
# File 'lib/wukong-hadoop/runner.rb', line 171

def command_prefix
  settings[:command_prefix]
end

#file_is_processor?(path) ⇒ true, false

Does the given path contain a processor named after itself?

Parameters:

  • path (String)

Returns:

  • (true, false)


162
163
164
165
# File 'lib/wukong-hadoop/runner.rb', line 162

def file_is_processor?(path)
  return false unless path
  processor_registered?(processor_name_from_file(path))
end

#mode:hadoop, :local

What mode is this runner in?

Returns:

  • (:hadoop, :local)


124
125
126
# File 'lib/wukong-hadoop/runner.rb', line 124

def mode
  settings[:mode].to_s == 'local' ? :local : :hadoop
end

#non_wukong_hadoop_params_stringString

Returns parameters to pass to an invocation of wu-local.

Parameters like --reduce_tasks which are relevant to Wukong-Hadoop will be interpreted and not passed. Others will be passed unmodified.

Returns:

  • (String)


183
184
185
186
187
188
189
# File 'lib/wukong-hadoop/runner.rb', line 183

def non_wukong_hadoop_params_string
  params_to_pass.reject do |param, val|
    params_to_pass.definition_of(param, :wukong_hadoop)
  end.map do |param,val|
    "--#{param}=#{Shellwords.escape(val.to_s)}"
  end.join(" ")
end

#processor_name_from_file(path) ⇒ String

Return the guessed name of a processor at the given path.

Parameters:

  • path (String)

Returns:

  • (String)


154
155
156
# File 'lib/wukong-hadoop/runner.rb', line 154

def processor_name_from_file(path)
  File.basename(path, '.rb')
end

#processor_registered?(name) ⇒ true, false

Is there a processor registered with the given name?

Parameters:

  • name (#to_s)

Returns:

  • (true, false)


146
147
148
# File 'lib/wukong-hadoop/runner.rb', line 146

def processor_registered? name
  Wukong.registry.registered?(name.to_s.to_sym)
end

#runObject

Run this command.



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/wukong-hadoop/runner.rb', line 109

def run
  if mode == :local
    log.info "Launching local!"
    execute_command!(local_commandline)
  else
    remove_output_path if settings[:rm] || settings[:overwrite]
    hadoop_commandline
    log.info "Launching Hadoop!"
    execute_command!(hadoop_commandline)
  end
end

#separate_map_and_reduce_args?true, false

Were mapper and/or reducer named by separate arguments?

Returns:

  • (true, false)


138
139
140
# File 'lib/wukong-hadoop/runner.rb', line 138

def separate_map_and_reduce_args?
  args.size == 2
end

#single_job_arg?true, false

Were mapper and/or reducer named by a single argument?

Returns:

  • (true, false)


131
132
133
# File 'lib/wukong-hadoop/runner.rb', line 131

def single_job_arg?
  args.size == 1
end

#validatetrue

Validate that no more than two arguments were given and that explicit input & output arguments were given if we're in Hadoop mode.

Returns:

  • (true)

Raises:

  • (Wukong::Error)

    if validations fail



100
101
102
103
104
105
106
# File 'lib/wukong-hadoop/runner.rb', line 100

def validate
  raise Error.new("Cannot provide more than two arguments") if args.length > 2
  if mode == :hadoop && (input_paths.nil? || input_paths.empty? || output_path.nil? || output_path.empty?)
    raise Error.new("Explicit --input and --output paths are required to run a job in Hadoop mode.")
  end
  true
end