Class: Wukong::Script

Inherits:
Object show all
Includes:
HadoopCommand, LocalCommand
Defined in:
lib/wukong/script.rb

Overview

How to run a Wukong script

your/script.rb --run path/to/input_files path/to/output_dir

All of the file paths are HDFS paths ; your script path, of course, is on the local filesystem.

Command-line options

If you’d like to listen for any command-line options, specify them at the command line:

your/script.rb --my_bool_opt --my_val_taking_opt=val \
  --run path/to/input_files path/to/output_dir

In this case the options hash for both Mapper and Reducer will contain

:my_bool_opt       => true,
:my_val_taking_opt => 'val'

Complicated input paths

To use more than one file as input, you can use normal * ? [] wildcards or give a comma-separated list – see the hadoop documentation for syntax.

Run in Elastic MapReduce Mode (–run=emr)

Wukong can be used to start scripts on the amazon cloud

  • copies the script to s3 in two parts

  • invokes it using the amazon API

Run locally (–run=local)

To run your script locally, use –run=local

your/script.rb --run=local path/to/input_files path/to/output_dir

This will pipe the contents of path/to/input_files through first your mapper, then sort, then the reducer, storing the results in the given output directory.

All paths refer to the /local/ filesystem – hadoop is never involved and in fact doesn’t even have to be installed.

How to test your scripts

You can supply the –map argument in place of –run to run the mapper on its own (and similarly, –reduce to run the reducer standalone):

cat ./local/test/input.tsv | ./examples/word_count.rb --map | more

or, if your test data lies on the HDFS,

hdp-cat test/input.tsv | ./examples/word_count.rb --map | more

Direct Known Subclasses

CassandraScript

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from LocalCommand

#execute_local_workflow, #local_commandline, #local_mode_sort_commandline

Methods included from HadoopCommand

#execute_hadoop_workflow, #hadoop_jobconf_options, #hadoop_other_args, #hadoop_recycle_env, #hadoop_runner, included, #jobconf

Constructor Details

#initialize(mapper, reducer = nil, extra_options = {}) ⇒ Script

Instantiate the Script with the Mapper and the Reducer class (each a Wukong::Streamer) it should call back.

Identity or External program as map or reduce

To use the identity reducer (‘cat’), instantiate your Script class with nil as the reducer class. (And similarly to use an identity mapper, supply nil for the mapper class.)

To use an external program as your reducer (mapper), subclass the reduce_command (map_command) method to return the full command line expression to call.

class MyMapper < Wukong::Streamer::Base
  # ... awesome stuff ...
end

class MyScript < Wukong::Script
  # prefix each unique line with the count of its occurrences.
  def reduce_command
    '/usr/bin/uniq -c'
  end
end
MyScript.new(MyMapper, nil).run


128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/wukong/script.rb', line 128

def initialize mapper, reducer=nil, extra_options={}
  Settings.resolve!
  @options = Settings
  options.merge! extra_options
  @mapper  = (case mapper  when Class then mapper.new  when nil then nil else mapper  ; end)
  @reducer = (case reducer when Class then reducer.new when nil then nil else reducer ; end)
  @output_path = options.rest.pop
  @input_paths = options.rest.reject(&:blank?)
  if (input_paths.blank? || output_path.blank?) && (not options[:dry_run]) && (not ['map', 'reduce'].include?(run_mode))
    raise "You need to specify a parsed input directory and a directory for output. Got #{ARGV.inspect}"
  end
end

Instance Attribute Details

#input_pathsObject (readonly)

Returns the value of attribute input_paths.



70
71
72
# File 'lib/wukong/script.rb', line 70

def input_paths
  @input_paths
end

#mapperObject (readonly)

Returns the value of attribute mapper.



69
70
71
# File 'lib/wukong/script.rb', line 69

def mapper
  @mapper
end

#optionsObject (readonly)

Returns the value of attribute options.



69
70
71
# File 'lib/wukong/script.rb', line 69

def options
  @options
end

#output_pathObject (readonly)

Returns the value of attribute output_path.



70
71
72
# File 'lib/wukong/script.rb', line 70

def output_path
  @output_path
end

#reducerObject (readonly)

Returns the value of attribute reducer.



69
70
71
# File 'lib/wukong/script.rb', line 69

def reducer
  @reducer
end

Instance Method Details

#job_nameObject



208
209
210
211
# File 'lib/wukong/script.rb', line 208

def job_name
  options[:job_name] ||
    "#{File.basename(this_script_filename)}---#{input_paths}---#{output_path}".gsub(%r{[^\w/\.\-\+]+}, '')
end

#mapper_commandline(run_option = :local) ⇒ Object

Shell command for map phase. By default, calls the script in –map mode In hadoop mode, this is given to the hadoop streaming command. In local mode, it’s given to the system() call



177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/wukong/script.rb', line 177

def mapper_commandline(run_option=:local)
  if mapper
    case run_option
    when :local then
      "#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params
    when :hadoop then
      "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --map " + non_wukong_params
    end
  else
    options[:map_command]
  end
end

#reducer_commandline(run_option = :local) ⇒ Object

Shell command for reduce phase. By default, calls the script in –reduce mode In hadoop mode, this is given to the hadoop streaming command. In local mode, it’s given to the system() call



195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/wukong/script.rb', line 195

def reducer_commandline(run_option=:local)
  if reducer
    case run_option
    when :local then
      "#{ruby_interpreter_path} #{this_script_filename} --reduce " + non_wukong_params
    when :hadoop then
      "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --reduce " + non_wukong_params
    end
  else
    options[:reduce_command]
  end
end

#runObject

In –run mode, use the framework (local, hadoop, emr, etc) to re-launch

the script as mapper, reducer, etc.

If –map or –reduce, dispatch to the mapper or reducer.



146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/wukong/script.rb', line 146

def run
  case run_mode
  when 'map'              then mapper.stream
  when 'reduce'           then reducer.stream
  when 'local'            then execute_local_workflow
  when 'cassandra'        then execute_hadoop_workflow
  when 'hadoop', 'mapred' then execute_hadoop_workflow
  when 'emr'
    require 'wukong/script/emr_command'
    execute_emr_workflow
  else                    dump_help
  end
end

#run_modeObject

if only –run is given, assume default run mode



161
162
163
164
165
166
167
168
169
170
# File 'lib/wukong/script.rb', line 161

def run_mode
  case
  when options[:map]           then 'map'
  when options[:reduce]        then 'reduce'
  when ($0 =~ /-mapper\.rb$/)  then 'map'
  when ($0 =~ /-reducer\.rb$/) then 'reduce'
  when (options[:run] == true) then options[:default_run_mode]
  else                         options[:run].to_s
  end
end

#safely(action, &block) ⇒ Object

Wrapper for dangerous operations to catch errors



214
215
216
217
218
# File 'lib/wukong/script.rb', line 214

def safely action, &block
  begin
    block.call
  rescue StandardError => e ; handle_error(action, e); end
end