Module: Wukong::Hadoop::ReduceLogic

Included in:
HadoopRunner
Defined in:
lib/wukong-hadoop/runner/reduce_logic.rb

Overview

Implements logic for figuring out the correct reducer commandline given wu-hadoop's arguments and whether or not to run a map-only (no-reduce) job.

Instance Method Summary collapse

Instance Method Details

#explicit_reduce_command?true, false

Were we given an explicit reduce command (like 'uniq -c') or are we to introspect and construct the command?

Returns:

  • (true, false)


30
31
32
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 30

def explicit_reduce_command?
  settings[:reduce_command]
end

#explicit_reduce_processor?true, false

Were we given a processor to use as our reducer explicitly by name or are we to introspect to discover the correct processor?

Returns:

  • (true, false)


39
40
41
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 39

def explicit_reduce_processor?
  settings[:reducer]
end

#explicit_reducer?true, false

Were we given an explicit reducer (either as a command or as a processor) or should we introspect to find one?

Returns:

  • (true, false)


47
48
49
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 47

def explicit_reducer?
  explicit_reduce_processor? || explicit_reduce_command?
end

#map_only?true, false

Is this a map-only job?

Returns:

  • (true, false)

See Also:



83
84
85
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 83

def map_only?
  (! reduce?)
end

#reduce?true, false

Should we perform a reduce or is this a map-only job?

We will definitely reduce if

  • given an explicit --reduce_command
  • we discovered a reducer

We will not reduce if:

  • --reduce_tasks was explicitly set to 0

Returns:

  • (true, false)


71
72
73
74
75
76
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 71

def reduce?
  return false if settings[:reduce_tasks] && settings[:reduce_tasks].to_i == 0
  return true  if settings[:reduce_command]
  return true  if reducer_name
  false
end

#reducer_argString

The argument that we should introspect on to turn into our reducer.

Returns:

  • (String)


55
56
57
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 55

def reducer_arg
  args.last
end

#reducer_commandlineString

Return the actual commandline used by the reducer, whether running in local or Hadoop mode.

You should be able to copy, paste, and run this command unmodified to debug the reducer.

Returns:

  • (String)


16
17
18
19
20
21
22
23
24
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 16

def reducer_commandline
  return ''                        unless reduce?
  return settings[:reduce_command] if     explicit_reduce_command?
  arg = (mode == :hadoop ? File.basename(reducer_arg) : reducer_arg)
  [command_prefix, 'wu-local', arg].tap do |cmd|
    cmd << "--run=#{reducer_name}" if reducer_needs_run_arg?
    cmd << non_wukong_hadoop_params_string
  end.compact.map(&:to_s).reject(&:empty?).join(' ')
end

#reducer_nameString

Return the name of the processor to use as the reducer.

Will raise a Wukong::Error if a given reducer is invalid. Will return nil if no reducer can be guessed.

Most of the logic that examines explicit command line arguments and checks for the existence of named processors or files is here.

Returns:

  • (String)


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 109

def reducer_name
  case
  when explicit_reducer?
    if processor_registered?(settings[:reducer])
      settings[:reducer]
    else
      raise Error.new("No such processor: '#{settings[:reducer]}'")
    end
  when single_job_arg? && explicit_mapper? && processor_registered?(reducer_arg)
    reducer_arg
  when separate_map_and_reduce_args? && processor_registered?(reducer_arg)
    reducer_arg
  when separate_map_and_reduce_args? && file_is_processor?(reducer_arg)
    processor_name_from_file(reducer_arg)
  when processor_registered?('reducer')
    'reducer'
  end
end

#reducer_needs_run_arg?true, false

Does the reducer commandline need an explicit --run argument?

Will not be used if the processor name is the same as the name of the script.

Returns:

  • (true, false)


93
94
95
96
97
# File 'lib/wukong-hadoop/runner/reduce_logic.rb', line 93

def reducer_needs_run_arg?
  return false if reducer_arg.to_s == reducer_name.to_s
  return false if File.basename(reducer_arg.to_s, '.rb') == reducer_name
  true
end