RedStorm v0.2.0 - JRuby on Storm

RedStorm provides the JRuby integration for the Storm distributed realtime computation system.

Changes from 0.1.x

  • This release introduces the simple DSL. Topology, Spout and Bolt classes can inherit from the SimpleTopoloy, SimpleSpout and SimpleBolt classes which provides a very clean and consise DSL. See examples/simple.
  • Use the same SimpleTopology class for local development cluster or remote production cluster.
  • The redstorm command has a new syntax.

Dependencies

This has been tested on OSX 10.6.8 and Linux 10.04 using Storm 0.5.4 and JRuby 1.6.5

Installation

$ gem install redstorm

Usage overview

  • create a new empty project directory.
  • install the RedStorm gem.
  • create a subdirectory which will contain your sources.
  • perform the initial setup as described below to install the dependencies in the target/ subdir of your project directory.
  • run your topology in local mode and/or on a production cluster as described below.

Initial setup

Install RedStom dependencies; from your project root directory execute:

$ redstorm install

The install command will install all Java jars dependencies using ruby-maven in target/dependency and generate & compile the Java bindings in target/classes

DON'T PANIC it's Maven. The first time you run $ redstorm install Maven will take a few minutes resolving dependencies and in the end will download and install the dependency jar files.

Run in local mode

Create a topology class. The underscore topology_class_file_name.rb MUST correspond to its CamelCase class name.

$ redstorm local <path/to/topology_class_file_name.rb>

See examples below to run examples in local mode or on a production cluster.

Run on production cluster

  • generate target/cluster-topology.jar. This jar file will include your sources directory plus the required dependencies from the target/ directory:
$ redstorm jar <sources_directory>
  • submit the cluster topology jar file to the cluster. Assuming you have the Storm distribution installed and the Storm bin/ directory in your path:
storm jar ./target/cluster-topology.jar redstorm.TopologyLauncher cluster <path/to/topology_class_file_name.rb>

Basically you must follow the Storm instructions to setup a production cluster and submit your topology to the cluster.

Examples

Install the example files in your project. The examples/ dir will be created in your project root dir.

$ redstorm examples

All examples using the simple DSL are located in examples/simple. Examples using the standard Java interface are in examples/native.

Local mode

$ redstorm local examples/simple/exclamation_topology.rb
$ redstorm local examples/simple/exclamation_topology2.rb
$ redstorm local examples/simple/word_count_topology.rb

This next example requires the use of the Redis Gem and a Redis server runnig on localhost:6379

$ redstorm local examples/simple/redis_word_count_topology.rb

Using redis-cli, push words into the test list and watch Storm pick them up

Production cluster

All examples using the simple DSL can also run on a productions cluster. The only native example compatible with a production cluster is the ClusterWordCountTopology

  • genererate the target/cluster-topology.jar and include the examples/ directory.
$ redstorm jar examples
  • submit the cluster topology jar file to the cluster, assuming you have the Storm distribution installed and the Storm bin/ directory in your path:
$ storm jar ./target/cluster-topology.jar redstorm.TopologyLauncher cluster examples/simple/word_count_topology.rb

Basically you must follow the Storm instructions to setup a production cluster and submit your topology to the cluster.

DSL usage

Your project can be created in a single file containing all spouts, bolts and topology classes or each classes can be in its own file, your choice. There are many examples for the simple DSL.

The DSL uses a callback metaphor to attach code to the topology/spout/bolt execution contexts using on_* DSL constructs (ex.: on_submit, on_send, ...). When using on_* you can attach you code in 3 different ways:

  • using a code block
on_receive (:ack => true, :anchor => true) {|tuple| do_something_with(tuple)}

on_receive :ack => true, :anchor => true do |tuple| 
  do_something_with(tuple)
end
  • defining the corresponding method
on_receive :ack => true, :anchor => true 
def on_receive(tuple)
  do_something_with(tuple)
end
  • defining an arbitrary method
on_receive :my_method, :ack => true, :anchor => true 
def my_method(tuple)
  do_something_with(tuple)
end

The example SplitSentenceBolt shows the 3 different coding style.

Topology DSL

Normally Storm topology components are assigned and referenced using numeric ids. In the SimpleTopology DSL ids are optional. By default the DSL will use the component class name as an implicit symbolic id and bolt source ids can use these implicit ids. The DSL will automatically resolve and assign numeric ids upon topology submission. If two components are of the same class, creating a conflict, then the id can be explicitly defined using either a numeric value, a symbol or a string. Numeric values will be used as-is at topology submission while symbols and strings will be resolved and assigned a numeric id.

require 'red_storm'

class MyTopology < RedStorm::SimpleTopology

  spout spout_class, options 

  bolt bolt_class, options do
    source source_id, grouping
    ...
  end

  configure topology_name do |env|
    config_attribute value
    ...
  end

  on_submit do |env|
    ...
  end
end

spout statement

spout spout_class, options
  • spout_class — spout Ruby class
  • options
    • :id — spout explicit id (default is spout class name)
    • :parallelism — spout parallelism (default is 1)

bolt statement

bolt bolt_class, options do
  source source_id, grouping
  ...
end
  • bolt_class — bolt Ruby class
  • options
    • :id — bolt explicit id (default is bolt class name)
    • :parallelism — bolt parallelism (default is 1)
  • source_id — source id reference. can be the source class name if unique or the explicit id if defined
  • grouping
    • :fields => ["field", ...] — fieldsGrouping using fields on the source_id
    • :shuffle — shuffleGrouping on the source_id
    • :global — globalGrouping on the source_id
    • :none — noneGrouping on the source_id
    • :all — allGrouping on the source_id
    • :direct — directGrouping on the source_id

configure statement

configure topology_name do |env|
  configuration_field value
  ...
end

The configure statement is optional.

  • topology_name — alternate topology name (default is topology class name)
  • env — is set to :local or :cluster for you to set enviroment specific configurations
  • config_attribute — the Storm Config attribute name. See Storm for complete list. The attribute name correspond to the Java setter method, without the "set" prefix and the suffix converted from CamelCase to underscore. Ex.: setMaxTaskParallelism is :max_task_parallelism.
    • :debug
    • :max_task_parallelism
    • :num_workers
    • :max_spout_pending
    • ...

on_submit statement

on_submit do |env|
  ...
end

The on_submit statement is optional. Use it to execute code after the topology submission.

  • env — is set to :local or :cluster

For example, you can use on_submit to shutdown the LocalCluster after some time. The LocalCluster instance is available usign the cluster method.

on_submit do |env|
  if env == :local
    sleep(5)
    cluster.shutdown
  end
end

Examples

Spout DSL

require 'red_storm'

class MySpout < RedStorm::SimpleSpout
  set spout_attribute => value
  ...

  output_fields :field, ...

  on_send options do
    ...
  end

  on_init do
    ...
  end

  on_close do
    ...
  end

  on_ack do |msg_id|
    ...
  end

  on_fail do |msg_id|
    ...
  end
end

set statement

set spout_attribute => value

The set statement is optional. Use it to set spout specific attributes.

  • spout_attributes
    • :is_distributed — set to true for a distributed spout (default is false)

output_fields statement

output_fields :field, ...

Define the output fields for this spout.

  • :field — the field name, can be symbol or string.

on_send statement

on_send options do
  ...
end

on_send relates to the Java spout nextTuple method and is called periodically by storm to allow the spout to output a tuple. When using auto-emit (default), the block return value will be auto emited. A single value return will be emited as a single-field tuple. An array of values [a, b] will be emited as a multiple-fields tuple. Normally a spout should only output a single tuple per on_send invocation.

  • :options
    • :emit — set to false to disable auto-emit (default is true)

on_init statement

on_init do
  ...
end

on_init relates to the Java spout open method. When on_init is called, the config, context and collector are set to return the Java spout config Map, TopologyContext and SpoutOutputCollector.

on_close statement

on_close do
  ...
end

on_close relates to the Java spout close method.

on_ack statement

on_ack do |msg_id|
  ...
end

on_ack relates to the Java spout ack method.

on_fail statement

on_fail do |msg_id|
  ...
end

on_fail relates to the Java spout fail method.

Examples

Bolt DSL

require 'red_storm'

class MyBolt < RedStorm::SimpleBolt
  output_fields :field, ...

  on_receive options do
    ...
  end

  on_init do
    ...
  end

  on_close do
    ...
  end
end

on_receive statement

on_receive options do
  ...
end

on_receive relates to the Java bolt execute method and is called upon tuple reception by Storm. When using auto-emit, the block return value will be auto emited. A single value return will be emited as a single-field tuple. An array of values [a, b] will be emited as a multiple-fields tuple. An array of arrays [[a, b], [c, d]] will be emited as multiple-fields multiple tuples. When not using auto-emit, the unanchored_emit(value, ...) and anchored_emit(tuple, value, ...) method can be used to emit a single tuple. When using auto-anchor (disabled by default) the sent tuples will be anchored to the received tuple. When using auto-ack (disabled by default) the received tuple will be ack'ed after emitting the return value. When not using auto-ack, the ack(tuple) method can be used to ack the tuple.

Note that setting auto-ack and auto-anchor is possible only when auto-emit is enabled.

  • :options
    • :emit — set to false to disable auto-emit (default is true)
    • :ack — set to true to enable auto-ack (default is false)
    • :anchor — set to true to enable auto-anchor (default is false)

on_init statement

on_init do
  ...
end

on_init relates to the Java bolt prepare method. When on_init is called, the config, context and collector are set to return the Java spout config Map, TopologyContext and SpoutOutputCollector.

on_close statement

on_close do
  ...
end

on_close relates to the Java bolt cleanup method.

Examples

Author

Colin Surprenant, @colinsurprenant, [email protected], [email protected], http://github.com/colinsurprenant

License

Apache License, Version 2.0. See the LICENSE.md file.