Method: Spark::RDD#pipe

Defined in:
lib/spark/rdd.rb

#pipe(*cmds) ⇒ Object

Return an RDD created by piping elements to a forked external process.

Cmds:

cmd = [env,] command... [,options]

env: hash
  name => val : set the environment variable
  name => nil : unset the environment variable
command...:
  commandline                 : command line string which is passed to the standard shell
  cmdname, arg1, ...          : command name and one or more arguments (This form does
                                not use the shell. See below for caveats.)
  [cmdname, argv0], arg1, ... : command name, argv[0] and zero or more arguments (no shell)
options: hash

See http://ruby-doc.org/core-2.2.0/Process.html#method-c-spawn

Examples:

$sc.parallelize(0..5).pipe('cat').collect
# => ["0", "1", "2", "3", "4", "5"]

rdd = $sc.parallelize(0..5)
rdd = rdd.pipe('cat', "awk '{print $1*10}'")
rdd = rdd.map(lambda{|x| x.to_i + 1})
rdd.collect
# => [1, 11, 21, 31, 41, 51]


913
914
915
# File 'lib/spark/rdd.rb', line 913

def pipe(*cmds)
  new_rdd_from_command(Spark::Command::Pipe, cmds)
end