Module: Fbp
- Defined in:
- lib/fbp.rb,
lib/fbp/version.rb,
lib/fbp/constants.rb,
lib/fbp/flow-node.rb,
lib/fbp/sort-node.rb,
lib/fbp/assign-node.rb,
lib/fbp/decode-node.rb,
lib/fbp/encode-node.rb,
lib/fbp/counter-node.rb,
lib/fbp/selector-node.rb,
lib/fbp/aggregator-node.rb,
lib/fbp/fpb-thread-pool.rb,
lib/fbp/concatenate-node.rb,
lib/fbp/text_file_reader_node.rb,
lib/fbp/text_file_writer_node.rb
Overview
Description
The Fbp module provides support for Flow Based Programming for the Ruby language.
Flow Based Programming is described in the Book: Flow-Based Programming, 2nd Edition: A New Approach to Application Development Written by J. Paul Morrison ISBN-10: 1451542321 ISBN-13: 978-1451542325
Documentation about Flow Based Programing can also be found at www.jpaulmorrison.com/fbp/
Discussion
The main idea behind Flow based Programming is that data should flow from one asynchronous processing unit to another asynchronous processing unit.
These asynchronous processing units together would form a network that would constitute an application. One of the many benefits of this programming model is it makes it much easier to create multi-threaded and multi-process applications. It also fosters the creation of small reusable components that maybe used in multiple applications. J. Paul Morrison does a great job of describing the benefits of this programming model and I highly recommend reading his book on the subject.
Recently developers have rediscovered Flow Based Programming as a way to deal with managing multi-threaded applications and to deal with the complexity of web development. NoFlo is a company that developed a Flow Based Programming system for Javascript.
While having used the precepts of Flow Based Programming over the years, there was not a unified way to do Flow Based Programming. Given that Ruby is one of the main development languages for web development, it seemed time was ripe for creating a Flow Based Programming system for Ruby
Concepts
One of the basic concepts of FBP is an Information Packet (IP) An IP defines the data that flows between asynchronous processing units. For Ruby Fbp an IP is a hash object. This allows for a common data type with infinite variations.
For the Ruby implementation, each asynchronous unit is implemented by a Node object. Each Node object runs on a thread to ensure asynchronous execution. To keep the number of threads to a reasonable amount, Node objects, execute on a thread in a thread pool managed by the Node_Pool class which in turn uses the Pool class developed by Kim Burgestrand [email protected].
Nodes define an input queue and at least one output channel. The input queue of a node uses a Queue object to manage the IPs that come into the node. While every node is run on a thread, no processing occurs on that thread until an item has been pushed into the input queue of an node. This is possible because the Queue class will block until it can get an item out of the queue.
Nodes may need to be parameterized . In J. Paul Morrison’s book, the data used to do parameterization is called an Initial Information Packed (IIP). For Ruby an IIP is a hash object. A Node typically takes an IIP as an optional argument when creating a new instance of an object. An IIP may also come to a node the node from its input queue, doing so means that the parameterization of a node can come from an upstream node.
By default Nodes instances have a single output channel named :output. For many Nodes this is sufficient. There are however some nodes that need to have multiple output channels. An example would be a Selector_node. A Selector_node takes IPs and compares a value in the IP with a preset value. Given the selector record the IP either matches a criteria or not. Depending on if the incoming IP matches the preset value or not determines which output channel. So while the default is that this is only a single output channel, Nodes may have any number of output channels. For Nodes that do have a multiple output channels, it is possible to write to every output channel by specifying the output channel :all.
Current Implementation
The current release must be consider a proof of concept. Only a handful of Nodes have been created and many more would need to be created before significant applications can be made. Over time more nodes will be developed but there are enough nodes to start looking at Flow Based Programming for Ruby.
Defined Under Namespace
Classes: Aggregator_node, Assign_node, Concatenate_node, Counter_node, Decode_node, Encode_node, Flow_node, Node, Node_pool, Selector_node, Sort_node, Test_file_reader_node, Text_file_writer_node
Constant Summary collapse
- VERSION =
:nodoc:
"0.1.0"- NOT_EQUAL_COMPARE =
0- EQUAL_COMPARE =
1- GREATER_THAN =
2- GREATER_THAN_OR_EQUAL =
3- LESS_THAN =
4- LESS_THAN_OR_EQUAL =
5- CONTAINS =
6- DOES_NOT_CONTAINS =
7- STARTS_WITH =
8- ENDS_WITH =
9- MATCHES =
10- @@node_pool =
:nodoc:
Node_pool.new
Class Method Summary collapse
- .has_pool ⇒ Object
- .make_flow_description(node) ⇒ Object
- .make_pool ⇒ Object
- .num_threads ⇒ Object
- .num_threads=(num_threads) ⇒ Object
- .read_flow_desc_from_file(file_path) ⇒ Object
- .shutdown ⇒ Object
- .write_flow_desc_to_file(flow_desc, file_path) ⇒ Object
Class Method Details
.has_pool ⇒ Object
74 75 76 |
# File 'lib/fbp/fpb-thread-pool.rb', line 74 def self.has_pool !@@node_pool.pool.nil? end |
.make_flow_description(node) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/fbp/flow-node.rb', line 55 def self.make_flow_description(node) return nil if node.nil? flow_desc = Array.new # Add the nodes to flow_desc add_node_to_flow_desc(node, flow_desc) # Make a hash keyed by an object_id with the value of the index of the object in the flow_desc array lookup_data = Hash.new flow_desc.each_index do |idx| lookup_data[flow_desc[idx][:object_id]] = idx end set_input_array(node, lookup_data, flow_desc) flow_desc end |
.make_pool ⇒ Object
65 66 67 |
# File 'lib/fbp/fpb-thread-pool.rb', line 65 def self.make_pool @@node_pool.make_pool end |
.num_threads ⇒ Object
56 57 58 |
# File 'lib/fbp/fpb-thread-pool.rb', line 56 def self.num_threads @@node_pool.num_threads end |
.num_threads=(num_threads) ⇒ Object
48 49 50 |
# File 'lib/fbp/fpb-thread-pool.rb', line 48 def self.num_threads= (num_threads) @@node_pool.num_threads = num_threads end |
.read_flow_desc_from_file(file_path) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/fbp/flow-node.rb', line 105 def self.read_flow_desc_from_file(file_path) return nil if file_path.nil? encoded_data = nil begin File.open(file_path, 'r') {|f| encoded_data = f.read()} rescue encoded_data = nil end return nil if encoded_data.nil? Marshal.load(encoded_data) end |
.shutdown ⇒ Object
83 84 85 |
# File 'lib/fbp/fpb-thread-pool.rb', line 83 def self.shutdown !@@node_pool.shutdown end |
.write_flow_desc_to_file(flow_desc, file_path) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fbp/flow-node.rb', line 80 def self.write_flow_desc_to_file(flow_desc, file_path) return false if flow_desc.nil? || file_path.nil? flow_data = Marshal.dump(flow_desc) result = false begin File.open(file_path, 'w') {|f| f.write(flow_data)} result = true rescue result = false end result end |