Module: Fbp

Defined in:
lib/fbp.rb,
lib/fbp/version.rb,
lib/fbp/constants.rb,
lib/fbp/flow-node.rb,
lib/fbp/sort-node.rb,
lib/fbp/assign-node.rb,
lib/fbp/decode-node.rb,
lib/fbp/encode-node.rb,
lib/fbp/counter-node.rb,
lib/fbp/selector-node.rb,
lib/fbp/aggregator-node.rb,
lib/fbp/fpb-thread-pool.rb,
lib/fbp/concatenate-node.rb,
lib/fbp/text_file_reader_node.rb,
lib/fbp/text_file_writer_node.rb

Overview

Description

The Fbp module provides support for Flow Based Programming for the Ruby language.

Flow Based Programming is described in the Book: Flow-Based Programming, 2nd Edition: A New Approach to Application Development Written by J. Paul Morrison ISBN-10: 1451542321 ISBN-13: 978-1451542325

Documentation about Flow Based Programing can also be found at www.jpaulmorrison.com/fbp/

Discussion

The main idea behind Flow based Programming is that data should flow from one asynchronous processing unit to another asynchronous processing unit.

These asynchronous processing units together would form a network that would constitute an application. One of the many benefits of this programming model is it makes it much easier to create multi-threaded and multi-process applications. It also fosters the creation of small reusable components that maybe used in multiple applications. J. Paul Morrison does a great job of describing the benefits of this programming model and I highly recommend reading his book on the subject.

Recently developers have rediscovered Flow Based Programming as a way to deal with managing multi-threaded applications and to deal with the complexity of web development. NoFlo is a company that developed a Flow Based Programming system for Javascript.

While having used the precepts of Flow Based Programming over the years, there was not a unified way to do Flow Based Programming. Given that Ruby is one of the main development languages for web development, it seemed time was ripe for creating a Flow Based Programming system for Ruby

Concepts

One of the basic concepts of FBP is an Information Packet (IP) An IP defines the data that flows between asynchronous processing units. For Ruby Fbp an IP is a hash object. This allows for a common data type with infinite variations.

For the Ruby implementation, each asynchronous unit is implemented by a Node object. Each Node object runs on a thread to ensure asynchronous execution. To keep the number of threads to a reasonable amount, Node objects, execute on a thread in a thread pool managed by the Node_Pool class which in turn uses the Pool class developed by Kim Burgestrand [email protected].

Nodes define an input queue and at least one output channel. The input queue of a node uses a Queue object to manage the IPs that come into the node. While every node is run on a thread, no processing occurs on that thread until an item has been pushed into the input queue of an node. This is possible because the Queue class will block until it can get an item out of the queue.

Nodes may need to be parameterized . In J. Paul Morrison’s book, the data used to do parameterization is called an Initial Information Packed (IIP). For Ruby an IIP is a hash object. A Node typically takes an IIP as an optional argument when creating a new instance of an object. An IIP may also come to a node the node from its input queue, doing so means that the parameterization of a node can come from an upstream node.

By default Nodes instances have a single output channel named :output. For many Nodes this is sufficient. There are however some nodes that need to have multiple output channels. An example would be a Selector_node. A Selector_node takes IPs and compares a value in the IP with a preset value. Given the selector record the IP either matches a criteria or not. Depending on if the incoming IP matches the preset value or not determines which output channel. So while the default is that this is only a single output channel, Nodes may have any number of output channels. For Nodes that do have a multiple output channels, it is possible to write to every output channel by specifying the output channel :all.

Current Implementation

The current release must be consider a proof of concept. Only a handful of Nodes have been created and many more would need to be created before significant applications can be made. Over time more nodes will be developed but there are enough nodes to start looking at Flow Based Programming for Ruby.

Defined Under Namespace

Classes: Aggregator_node, Assign_node, Concatenate_node, Counter_node, Decode_node, Encode_node, Flow_node, Node, Node_pool, Selector_node, Sort_node, Test_file_reader_node, Text_file_writer_node

Constant Summary collapse

VERSION =

:nodoc:

"0.1.0"
NOT_EQUAL_COMPARE =
0
EQUAL_COMPARE =
1
GREATER_THAN =
2
GREATER_THAN_OR_EQUAL =
3
LESS_THAN =
4
LESS_THAN_OR_EQUAL =
5
CONTAINS =
6
DOES_NOT_CONTAINS =
7
STARTS_WITH =
8
ENDS_WITH =
9
MATCHES =
10
@@node_pool =

:nodoc:

Node_pool.new

Class Method Summary collapse

Class Method Details

.has_poolObject



74
75
76
# File 'lib/fbp/fpb-thread-pool.rb', line 74

def self.has_pool
	!@@node_pool.pool.nil?
end

.make_flow_description(node) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fbp/flow-node.rb', line 55

def self.make_flow_description(node)
  return nil if node.nil?
  flow_desc = Array.new

  # Add the nodes to flow_desc
  add_node_to_flow_desc(node, flow_desc)

  # Make a hash keyed by an object_id with the value of the index of the object in the flow_desc array
  lookup_data = Hash.new
  flow_desc.each_index do |idx|
    lookup_data[flow_desc[idx][:object_id]] = idx
  end

  set_input_array(node, lookup_data, flow_desc)
  flow_desc
end

.make_poolObject



65
66
67
# File 'lib/fbp/fpb-thread-pool.rb', line 65

def self.make_pool
	@@node_pool.make_pool
end

.num_threadsObject



56
57
58
# File 'lib/fbp/fpb-thread-pool.rb', line 56

def self.num_threads
	@@node_pool.num_threads
end

.num_threads=(num_threads) ⇒ Object



48
49
50
# File 'lib/fbp/fpb-thread-pool.rb', line 48

def self.num_threads= (num_threads)
	@@node_pool.num_threads = num_threads
end

.read_flow_desc_from_file(file_path) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/fbp/flow-node.rb', line 105

def self.read_flow_desc_from_file(file_path)
  return nil if file_path.nil?
  encoded_data = nil
  
  begin
    File.open(file_path, 'r') {|f| encoded_data = f.read()}
  rescue
    encoded_data = nil
  end

  return nil if encoded_data.nil?

  Marshal.load(encoded_data)
end

.shutdownObject



83
84
85
# File 'lib/fbp/fpb-thread-pool.rb', line 83

def self.shutdown
!@@node_pool.shutdown
end

.write_flow_desc_to_file(flow_desc, file_path) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fbp/flow-node.rb', line 80

def self.write_flow_desc_to_file(flow_desc, file_path)
  return false if flow_desc.nil? || file_path.nil?
  
  flow_data = Marshal.dump(flow_desc)

  result = false
  begin
    File.open(file_path, 'w') {|f| f.write(flow_data)}
    result = true
  rescue
    result = false
  end

  result
end