The purpose of this gem is to help building complex dataflows and support automating long-running batch processes. It handles parallelizing computation whenever it cans and re-computing dependencies that are not up-to-date.

There are two main concepts in describing a computing graph:

  • data-nodes, which support storing/retrieving data from databases
  • compute-nodes, which supports arbitrary processing, can depend on any number of nodes (compute/data) and can push their results to a data-node if needed

The main use case is to represent data sources with data-nodes and link those to compute-nodes. Upon computing, the node will store the result in another data-node.

The graph's metadata (e.g. nodes' dependencies, properties) is stored in MongoDB. It also uses MongoDB as the default DB for the data-node storage as it allows for quick schema-less prototyping. MySQL and PostgreSQL are also supported (through Sequel).

This repository only includes the most common nodes. Other repos will include custom (application-dependent) nodes.

It has some similarities with the Luigi python module.


Add this line to your application's Gemfile:

gem 'dataflow-rb'

And then execute:

$ bundle

Or install it yourself as:

$ gem install dataflow-rb

You also need to install:

  • mongodb 3.2 (required)
  • postgresql (optional)
  • mysql (optional)


require 'dataflow-rb'

# Create a data node
node1 = Dataflow::Nodes::DataNode.create(db_name: 'test', name: 'data_source1')
node1.add(records: [{id: 1, first_name: 'hello'}])
# => [{"id"=>1, "name"=>"test"}]

node2 = Dataflow::Nodes::DataNode.create(db_name: 'test', name: 'data_source2')
node2.add(records: [{id: 1, last_name: 'world'}])
# => [{"id"=>1, "name"=>"world"}]

# We will keep the results of the computation in this dataset
result_node = Dataflow::Nodes::DataNode.create(db_name: 'test', name: 'result')

# Join the 2 datasets by id:
compute_node = Dataflow::Nodes::JoinNode.create(
  name: 'join',
  dependency_ids: [node1, node2],
  data_node_id: result_node,
  key1: 'id',
  key2: 'id'
# => [{"id"=>1, "first_name"=>"hello", "last_name"=>"world"}]
compute_node.all # this is just a facade for the above
# => [{"id"=>1, "first_name"=>"hello", "last_name"=>"world"}]

# Fetch the data again later:
result_node = Dataflow::Nodes::DataNode.find_by(name: 'result')
# or the short hand:
result_node = Dataflow.data_node('result')
# => [{"id"=>1, "first_name"=>"hello", "last_name"=>"world"}]


