SwineHerd
Swineherd is for running scripts and workflows on filesystems.
Outline
A workflow is built with script objects and ran on a filesystem.
Script:
A script has the following
source– The source file used. These can be Apache Pig scripts, Wukong scripts, even R scripts. You can add your own scripts by subclassing thescriptclass.input– An array of input paths.output– An array of output paths.options– A ruby hash of options used as command line args. Eg. => ’bar’. How these options are mapped to command line arguments is up to the particular script class.attributes– A ruby hash of parameters used for variable substitution. Every script is assumed to be (but not required to be) an eruby template.
Workflow:
A workflow is built using rake task objects that doing nothing more than run scripts. A workflow
- can be described with a directed dependency graph
- has an
idwhich is used to run its tasks idempotently. At the moment it is the responsibility of the running process (or human being) to choose a suitable id. - manages intermediate outputs by using the
next_outputandlatest_outputmethods. See the examples dir for usage. - A workflow has a working directory in which all intermediate outputs go
- These are named according to the rake task that created them
FileSystem
Workflows are intended to run on filesystems. At the moment, implemented filesystems are
file– Local file system. Only thoroughly tested on unbuntu linux.hdfs– Hadoop distributed file system. Uses jruby and the Apache Hadoop 0.20 api.s3– Uses the right_aws gem for interacting with Amazon Simple Storage System (s3).
Using the filesystem:
Paths should be absolute.
# get a new instance of local filesystem and write to it
localfs = FileSystem.get(:file)
localfs.open("mylocalfile", 'w') do |f|
f.write("Writing a string to a local file")
end
# get a new instance of hadoop filesystem and write to it
hadoopfs = FileSystem.get(:hdfs)
hadoopfs.open("myhadoopfile", 'w') do |f|
f.write("Writing a string to an hdfs file")
end
# get a new instance of s3 filesystem and write to it
access_key_id = '1234abcd'
secret_access_key = 'foobar1234'
s3fs = FileSystem.get(:s3, accees_key_id, secret_access_key)
s3fs.mkpath 'mys3bucket' # bucket must exist
s3fs.open("mys3bucket/mys3file", 'w') do |f|
f.write("Writing a string to an s3 file")
end
Working Example
For the most up to date working example see the examples directory. Here’s a simple example for running pagerank:
#!/usr/bin/env ruby
$LOAD_PATH << '../../lib'
require 'swineherd' ; include Swineherd
require 'swineherd/script' ; include Swineherd::Script
require 'swineherd/filesystem'
Settings.define :flow_id, :required => true, :description => "Flow id required to make run of workflow unique"
Settings.define :iterations, :type => Integer, :default => 10, :description => "Number of pagerank iterations to run"
Settings.define :hadoop_home, :default => '/usr/local/share/hadoop', :description => "Path to hadoop config"
Settings.resolve!
flow = Workflow.new(Settings.flow_id) do
# The filesystems we're going to be working with
hdfs = Swineherd::FileSystem.get(:hdfs)
localfs = Swineherd::FileSystem.get(:file)
# The scripts we're going to use
initializer = PigScript.new('scripts/pagerank_initialize.pig')
iterator = PigScript.new('scripts/pagerank.pig')
finisher = WukongScript.new('scripts/cut_off_list.rb')
plotter = RScript.new('scripts/histogram.R')
#
# Runs simple pig script to initialize pagerank. We must specify the input
# here as this is the first step in the workflow. The output attribute is to
# ensure idempotency and the options attribute is the hash that will be
# converted into command-line args for the pig interpreter.
#
task :pagerank_initialize do
initializer. = {:adjlist => "/tmp/pagerank_example/seinfeld_network.tsv", :initgrph => next_output(:pagerank_initialize)}
initializer.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_initialize)
end
#
# Runs multiple iterations of pagerank with another pig script and manages all
# the intermediate outputs.
#
task :pagerank_iterate => [:pagerank_initialize] do
iterator.[:damp] = '0.85f'
iterator.[:curr_iter_file] = latest_output(:pagerank_initialize)
Settings.iterations.times do
iterator.[:next_iter_file] = next_output(:pagerank_iterate)
iterator.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_iterate)
iterator.refresh!
iterator.[:curr_iter_file] = latest_output(:pagerank_iterate)
end
end
#
# Here we use a wukong script to cut off the last field (a big pig bag of
# links). Notice how every wukong script MUST have an input but pig scripts do
# not.
#
task :cut_off_adjacency_list => [:pagerank_iterate] do
finisher.input << latest_output(:pagerank_iterate)
finisher.output << next_output(:cut_off_adjacency_list)
finisher.run :hadoop unless hdfs.exists? latest_output(:cut_off_adjacency_list)
end
#
# We want to pull down one result file, merge the part-000.. files into one file
#
task :merge_results => [:cut_off_adjacency_list] do
merged_results = next_output(:merge_results)
hdfs.merge(latest_output(:cut_off_adjacency_list), merged_results) unless hdfs.exists? merged_results
end
#
# Cat results into a local directory with the same structure
# eg. #{work_dir}/#{flow_id}/pull_down_results-0.
#
# FIXME: Bridging filesystems is cludgey.
#
task :pull_down_results => [:merge_results] do
local_results = next_output(:pull_down_results)
hdfs.copy_to_local(latest_output(:merge_results), local_results) unless localfs.exists? local_results
end
#
# Plot 2nd column of the result as a histogram (requires R and
# ggplot2). Note that the output here is a png file but doesn't have that
# extension. Ensmarten me as to the right way to handle that?
#
task :plot_results => [:pull_down_results] do
plotter.attributes = {
:pagerank_data => latest_output(:pull_down_results),
:plot_file => next_output(:plot_results), # <-- this will be a png...
:raw_rank => "aes(x=d$V2)"
}
plotter.run(:local) unless localfs.exists? latest_output(:plot_results)
end
end
flow.workdir = "/tmp/pagerank_example"
flow.describe
flow.run(:plot_results)
Utils
There’s a fun little program to emphasize the ease of using the filesystem abstraction called ‘hdp-tree’:
$: bin/hdp-tree /tmp/my_hdfs_directory
---
/tmp/my_hdfs_directory:
- my_hdfs_directory:
- sub_dir_a: leaf_file_1
- sub_dir_a: leaf_file_2
- sub_dir_a: leaf_file_3
- my_hdfs_directory:
- sub_dir_b: leaf_file_1
- sub_dir_b: leaf_file_2
- sub_dir_b: leaf_file_3
- my_hdfs_directory:
- sub_dir_c: leaf_file_1
- sub_dir_c: leaf_file_2
- sub_dir_c: leaf_file_3
- sub_dir_c:
- sub_sub_dir_a: yet_another_leaf_file
- sub_dir_c: sub_sub_dir_b
- sub_dir_c: sub_sub_dir_c
I know, it’s not as pretty as unix tree, but this IS github…
TODO
- next task in a workflow should NOT run if the previous step failed
- this is made difficult by the fact that, sometimes?, when a pig script fails it still returns a 0 exit status
- same for wukong scripts
- add a
jobobject that implements anot_iffunction. this way aworkflowwill be constructed ofjobobjects- a
jobwill do nothing more than execute the ruby code in it’s (run?) block, unlessnot_ifis true - this way we can put
scriptobjects inside ajoband only run under certain conditions that the user specifies when they create thejob
- a
- implement ftp filesystem interfaces