Module: Wukong::HadoopCommand

Included in:
Script
Defined in:
lib/wukong/script/hadoop_command.rb

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object

Standard ClassMethods-on-include trick



182
183
184
185
186
# File 'lib/wukong/script/hadoop_command.rb', line 182

def self.included base
  base.class_eval do
    extend ClassMethods
  end
end

Instance Method Details

#execute_hadoop_workflowObject

Assemble the hadoop command to execute and launch the hadoop runner to execute the script across all tasktrackers

FIXME: Should add some simple logic to ensure that commands are in the right order or hadoop will complain. ie. -D options MUST come before others



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/wukong/script/hadoop_command.rb', line 59

def execute_hadoop_workflow
  # Input paths join by ','
  input_paths = @input_paths.join(',')
  #
  # Use Settings[:hadoop_home] to set the path your config install.
  hadoop_commandline = [
    hadoop_runner,
    "jar #{options[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar",
    hadoop_jobconf_options,
    "-D mapred.job.name='#{job_name}'",
    hadoop_other_args,
    "-mapper  '#{mapper_commandline(:hadoop)}'",
    "-reducer '#{reducer_commandline(:hadoop)}'",
    "-input   '#{input_paths}'",
    "-output  '#{output_path}'",
    "-file    '#{this_script_filename}'",
    hadoop_recycle_env,
  ].flatten.compact.join(" \t\\\n  ")
  Log.info "  Launching hadoop!"
  execute_command!(hadoop_commandline)
end

#hadoop_jobconf_optionsObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/wukong/script/hadoop_command.rb', line 81

def hadoop_jobconf_options
  jobconf_options = []
  # Fixup these options
  options[:reuse_jvms] = '-1'             if (options[:reuse_jvms] == true)
  options[:respect_exit_status] = 'false' if (options[:ignore_exit_status] == true)
  # If no reducer and no reduce_command, then skip the reduce phase
  options[:reduce_tasks] = 0 if (! reducer) && (! options[:reduce_command]) && (! options[:reduce_tasks])
  # Fields hadoop should use to distribute records to reducers
  unless options[:partition_fields].blank?
    jobconf_options += [
      jobconf(:partition_fields),
      jobconf(:output_field_separator),
    ]
  end
  jobconf_options += [
    :io_sort_mb,               :io_sort_record_percent,
    :map_speculative,          :map_tasks,
    :max_maps_per_cluster,     :max_maps_per_node,
    :max_node_map_tasks,       :max_node_reduce_tasks,
    :max_reduces_per_cluster,  :max_reduces_per_node,
    :max_record_length,        :min_split_size,
    :output_field_separator,   :key_field_separator,
    :partition_fields,         :sort_fields,
    :reduce_tasks,             :respect_exit_status,
    :reuse_jvms,               :timeout,
    :max_tracker_failures,     :max_map_attempts,
    :max_reduce_attempts
  ].map{|opt| jobconf(opt)}
  jobconf_options.flatten.compact
end

#hadoop_other_argsObject



112
113
114
115
116
117
118
119
120
# File 'lib/wukong/script/hadoop_command.rb', line 112

def hadoop_other_args
  extra_str_args  = [ options[:extra_args] ]
  if options.split_on_xml_tag
    extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{options.split_on_xml_tag}>,end=</#{options.split_on_xml_tag}>'}
  end
  extra_str_args   << ' -lazyOutput' if options[:noempty]  # don't create reduce file if no records
  extra_str_args   << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless options[:partition_fields].blank?
  extra_str_args
end

#hadoop_recycle_envObject



122
123
124
125
126
# File 'lib/wukong/script/hadoop_command.rb', line 122

def hadoop_recycle_env
  %w[RUBYLIB].map do |var|
    %Q{-cmdenv '#{var}=#{ENV[var]}'} if ENV[var]
  end.compact
end

#hadoop_runnerObject

The path to the hadoop runner script



129
130
131
# File 'lib/wukong/script/hadoop_command.rb', line 129

def hadoop_runner
  options[:hadoop_runner] || (options[:hadoop_home]+'/bin/hadoop')
end

#jobconf(option) ⇒ Object

emit a -jobconf hadoop option if the simplified command line arg is present if not, the resulting nil will be elided later



44
45
46
47
48
49
# File 'lib/wukong/script/hadoop_command.rb', line 44

def jobconf option
  if options[option]
    # "-jobconf %s=%s" % [options.definition_of(option, :description), options[option]]
    "-D %s=%s" % [options.definition_of(option, :description), options[option]]
  end
end