Wukong-Storm

The Hadoop plugin for Wukong lets you run Wukong processors and dataflows as Storm topologies reading data in and out from Kafka.

Before you use Wukong-Storm to develop, test, and write your Hadoop jobs, you might want to read about Wukong, write some simple processors, and read about some of Storm's core concepts.

You might also want to check out some other projects which enrich the Wukong and Hadoop experience:

  • wukong-hadoop: Run Wukong processors and dataflows as mappers and/or reducers within the Hadoop framework. Model jobs locally before you run them.
  • wukong-load: Load the output data from your local Wukong jobs and flows into a variety of different data stores.
  • wukong-deploy: Orchestrate Wukong and other wu-tools together to support an application running on the Infochimps Platform.

Installation & Setup

Wukong-Storm can be installed as a RubyGem:

$ sudo gem install wukong-storm

If you actually want to run your dataflows as functioning Storm topologies reading/writing to/from Kafka, you'll of course need access to Storm and Kafka installations. Ironfan is a great tool for building and managing Storm clusters and other distributed infrastructure quickly and easily.

To run Storm jobs through Wukong-Storm, you'll need to move your your Wukong code to each worker of the Storm cluster, install Wukong-Storm on each, and log in and launch your job fron one of them. Ironfan again helps with configuring this.

Anatomy of a running topology

Storm defines the concept of a topology. A topology contains spouts and bolts. A spout is a source of data. A bolt processes data. Bolts can be connected to each other and to spouts in arbitrary ways.

Tooplogies submitted to Storm's Nimbus but run within a Storm supervisor. Each supervisor can dedicate a certain number of workers to a topology. Within each worker, parallelism controls the number of threads the worker assigns to the topology.

Wukong-Storm runs each Wukong dataflow as a single bolt within a single topology. Data is passed to this bolt over STDIN and collected over STDOUT, similar to the way Hadoop streaming operates.

This topology is hooked up to a storm.kafka.trident.OpaqueTridentKafkaSpout (part of storm-contrib) which reads from a single input topic within Kafka.

Output records are written to a default Kafka topic but this can be overridden on a per-record basis.

Communication protocol

A Wukong dataflow launched within Storm runs as a single bolt (see com.infochimps.wukong.storm.SubprocessFunction). This bolt works by launching an arbitrary command-line and sending it records over STDIN and reading its output over STDOUT. The SubprocessFunction class expects whatever command it launched to obey a protocol under which the output after each input consists of each output record followed by a newline, with the full batch of output records followed by a batch terminator (default: ---) then another newline.

Wukong-Storm comes with a command wu-bolt which works very similarly to wu-local but implements this protocol. Here's an example of using wu-bolt directly with a processor:

$ echo 2 | wu-bolt prime_factorizer.rb
2
---
$ echo 12 | wu-bolt prime_factorizer.rb
2
2
3
---
$ echo 19 | wu-bolt prime_factorizer.rb
---

Notice that in the last example, the presence of the batch delimiter after each input record make it easy to tell the difference between "no output records" and "no output records yet" which, over STDIN/STDOUT, is rather hard to tell otherwise.

Running a dataflow

A simple processor

Assuming you have correctly installed Wukong-Storm, Storm, Kafka, Zookeeper, &c., and you have defined a simple dataflow (or in this case, just a single processor) like this:

# in upcaser.rb
Wukong.processor(:upcaser) do
  def process line
    yield line.upcase
  end
end

Then you can launch it directly into Storm:

$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic

If a topology named upcaser already exists, you'll get an error. Add the --rm flag to first kill the running topology before launching the new one:

$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm

The default amount of time to wait for the topology to die is 300 seconds (5 minutes), just like the storm kill command (which is used under the hood). When debugging a topology in development, it's helpful to add --wait=1 to immediately kill the topology.

See exactly what happened behind the scenes by adding the --dry_run flag which will print commands and not execute them:

$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm --dry_run

A more complicated example

Say you have a dataflow:

# in my_flow.rb
Wukong.dataflow(:my_flow) do
  my_parser | does_something | then_something_else | to_json
end

You can launch it using a different topology name as well as target arbitrary locations for your Zookeeper, Kafka, and Storm servers:

$ wu-storm my_flow.rb --name=my_flow_attempt_3 --zookeeper_hosts=10.121.121.121,10.122.122.122 --kafka_hosts=10.123.123.123 --nimbus_host=10.124.124.124 --input=some_input_topic --output=some_output_topic

Running non-Wukong or non-Ruby code

You can also use Wukong-Storm as a harness to run non-Wukong or non-Ruby code. As long as you can specificy a command-line to run which supports the communication protocol, then you can run it with wu-storm:

$ wu-storm --bolt_command='my_cmd --some-option=value -af -q 3' --input=some_input_topic --output=some_output_topic

Scaling options

Storm provides several options for scaling up or down a topology. Wukong-Storm makes them accessible at launch time via the following options:

  • --workers specify the number of workers (a.k.a. "executors" or "slots") for the topology. Defaults to 1.
  • --input_parallelism specify the number of threads within the spout reading from Kafka within each worker. Defaults to 1.
  • --parallelism specify the number of threads within the bolt running Wukong code within each worker. Defaults to 1.