Module: Wukong::HadoopCommand
- Included in:
- Script
- Defined in:
- lib/wukong/script/hadoop_command.rb
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
-
.included(base) ⇒ Object
Standard ClassMethods-on-include trick.
Instance Method Summary collapse
-
#execute_hadoop_workflow ⇒ Object
Assemble the hadoop command to execute and launch the hadoop runner to execute the script across all tasktrackers.
- #hadoop_jobconf_options ⇒ Object
- #hadoop_other_args ⇒ Object
- #hadoop_recycle_env ⇒ Object
-
#hadoop_runner ⇒ Object
The path to the hadoop runner script.
-
#jobconf(option) ⇒ Object
emit a -jobconf hadoop option if the simplified command line arg is present if not, the resulting nil will be elided later.
Class Method Details
.included(base) ⇒ Object
Standard ClassMethods-on-include trick
182 183 184 185 186 |
# File 'lib/wukong/script/hadoop_command.rb', line 182 def self.included base base.class_eval do extend ClassMethods end end |
Instance Method Details
#execute_hadoop_workflow ⇒ Object
Assemble the hadoop command to execute and launch the hadoop runner to execute the script across all tasktrackers
FIXME: Should add some simple logic to ensure that commands are in the right order or hadoop will complain. ie. -D options MUST come before others
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/wukong/script/hadoop_command.rb', line 59 def execute_hadoop_workflow # Input paths join by ',' input_paths = @input_paths.join(',') # # Use Settings[:hadoop_home] to set the path your config install. hadoop_commandline = [ hadoop_runner, "jar #{[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar", , "-D mapred.job.name='#{job_name}'", hadoop_other_args, "-mapper '#{mapper_commandline(:hadoop)}'", "-reducer '#{reducer_commandline(:hadoop)}'", "-input '#{input_paths}'", "-output '#{output_path}'", "-file '#{this_script_filename}'", hadoop_recycle_env, ].flatten.compact.join(" \t\\\n ") Log.info " Launching hadoop!" execute_command!(hadoop_commandline) end |
#hadoop_jobconf_options ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/wukong/script/hadoop_command.rb', line 81 def = [] # Fixup these options [:reuse_jvms] = '-1' if ([:reuse_jvms] == true) [:respect_exit_status] = 'false' if ([:ignore_exit_status] == true) # If no reducer and no reduce_command, then skip the reduce phase [:reduce_tasks] = 0 if (! reducer) && (! [:reduce_command]) && (! [:reduce_tasks]) # Fields hadoop should use to distribute records to reducers unless [:partition_fields].blank? += [ jobconf(:partition_fields), jobconf(:output_field_separator), ] end += [ :io_sort_mb, :io_sort_record_percent, :map_speculative, :map_tasks, :max_maps_per_cluster, :max_maps_per_node, :max_node_map_tasks, :max_node_reduce_tasks, :max_reduces_per_cluster, :max_reduces_per_node, :max_record_length, :min_split_size, :output_field_separator, :key_field_separator, :partition_fields, :sort_fields, :reduce_tasks, :respect_exit_status, :reuse_jvms, :timeout, :max_tracker_failures, :max_map_attempts, :max_reduce_attempts ].map{|opt| jobconf(opt)} .flatten.compact end |
#hadoop_other_args ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/wukong/script/hadoop_command.rb', line 112 def hadoop_other_args extra_str_args = [ [:extra_args] ] if .split_on_xml_tag extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{.split_on_xml_tag}>,end=</#{.split_on_xml_tag}>'} end extra_str_args << ' -lazyOutput' if [:noempty] # don't create reduce file if no records extra_str_args << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless [:partition_fields].blank? extra_str_args end |
#hadoop_recycle_env ⇒ Object
122 123 124 125 126 |
# File 'lib/wukong/script/hadoop_command.rb', line 122 def hadoop_recycle_env %w[RUBYLIB].map do |var| %Q{-cmdenv '#{var}=#{ENV[var]}'} if ENV[var] end.compact end |
#hadoop_runner ⇒ Object
The path to the hadoop runner script
129 130 131 |
# File 'lib/wukong/script/hadoop_command.rb', line 129 def hadoop_runner [:hadoop_runner] || ([:hadoop_home]+'/bin/hadoop') end |
#jobconf(option) ⇒ Object
emit a -jobconf hadoop option if the simplified command line arg is present if not, the resulting nil will be elided later
44 45 46 47 48 49 |
# File 'lib/wukong/script/hadoop_command.rb', line 44 def jobconf option if [option] # "-jobconf %s=%s" % [options.definition_of(option, :description), options[option]] "-D %s=%s" % [.definition_of(option, :description), [option]] end end |