Class: Fbp::Node
- Inherits:
-
Object
- Object
- Fbp::Node
- Defined in:
- lib/fbp.rb
Overview
Description
The Node class defines the base class for all Ruby Fbp Nodes. It defines the basic operations of all Nodes.
Discussion
When creating a new Node subclass is made it will need to at least override the do_node_work method. If the subclass requires an IIP the subclass will also need to override the is_ready_to_run? method.
The Node class basic behavior is to write its incoming IPs to its single output channel.
Node object life cycle
A node is created it is quiescent. This is necessary as most node need other nodes in a network of node before it is useful. Once all of the nodes have been created that are needed for an application, they need to be placed into a network using the register_for_output_from_node method to hve the output of one node become the input to another node. Once the network has been made. The first node in the network needs to be executed. Calling the execute method on the first node will ensure that all nodes in the network will also be executed. When a node is execute, it will be sent an IP => true. This means that when a new node subclass is made it needs to know that it will receive this IP in its def do_node_work method. Once a node is executing, it will block until an IP is push into the input queue of the node unless the node has the : :requires_input option is set to false. If that is the case the node will not block but will send an IP to the node of the form => true for the node to process. The ability not to block is useful for nodes like the Test_file_reader_node node that reads input from a file and creates IPs for down stream nodes to process.
Each subclass of the Node class must override the do_node_work method. If the node has not completed its work then the do_node_work method should return true. If the node has completed its work it should return false. When a node has completed its work, it will be sent an IP of => true. This :completed IP is handled in a special way. It will push this IP into output_queue attribute of a Node object. This allows the wait_until_completed to block until all of the work of a node is completed. The => true IP is also sent to every output channel for a node telling all of the down stream nodes that its upstream node has finished.
Example Usage
# Need to require the Fbp gem
require 'Fbp'
# First Set the number of threads that should be used for this solution
Fbp::num_threads = 5
# Make the thread pool that will be used to run the nodes in the application
Fbp::make_pool
# Make the nodes needed for the application
read_node = Fbp::Test_file_reader_node.new(File.('~/input.txt'))
write_node = Fbp::Text_file_writer_node.new(File.('~/output.txt'))
# Hook up the nodes into the network needed for the application
write_node.register_for_output_from_node(read_node)
# Execute the first node in the network
read_node.execute
# Wait for the network to complete its work by checking to see if the last node
# in the network has completed
write_node.wait_until_completed
# With the work completed shutdown the thread pool
Fbp::shutdown
Direct Known Subclasses
Aggregator_node, Assign_node, Concatenate_node, Counter_node, Decode_node, Encode_node, Flow_node, Selector_node, Sort_node, Test_file_reader_node, Text_file_writer_node
Instance Attribute Summary collapse
-
#executing ⇒ Object
readonly
The executing attribute specifies if a Node is executing.
-
#options ⇒ Object
readonly
The options attribute hold the IIP data for a Node.
-
#output ⇒ Object
The output attribute is an Array that holds all of the output channels for this Node.
Instance Method Summary collapse
-
#clean_option(key) ⇒ Object
:nodoc:.
-
#do_node_work(args) ⇒ Object
The do_node_work method is where the work of a Node is done.
-
#execute ⇒ Object
The execute method will start the execution of this node on one of the threads in the thread pool.
-
#initialize ⇒ Node
constructor
:nodoc:.
-
#is_ready_to_run? ⇒ Boolean
The is_ready_to_run? method is used to ensure that a Node has received its required IIP before it is allowed to process incoming IPs.
-
#merge_options!(options) ⇒ Object
:nodoc:.
-
#register_for_output_from_node(node, output_channel = :output) ⇒ Object
The register_for_output_from_node method is how networks of nodes are created.
-
#set_option(key, value) ⇒ Object
:nodoc:.
-
#stop ⇒ Object
The stop method will send an IP to this node of the form => true.
-
#unregister_for_output_from_node(node, output_channel = :output) ⇒ Object
The unregister_for_output_from_node method will remove this node from an output queue of an up stream node.
-
#wait_until_completed ⇒ Object
The wait_until_completed provides a way to wait until a node has completed its work.
-
#write_to_input(obj) ⇒ Object
The write_to_input method will push an IP onto the input queue of a Node.
-
#write_to_output(result, output_channel = :all) ⇒ Object
The write_to_output will write an IP into the input queue of all of the nodes in the specified output channel.
Constructor Details
#initialize ⇒ Node
:nodoc:
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/fbp.rb', line 184 def initialize() #:nodoc: # Initialize the input and out data types @input_queue = Queue.new @output_queue = Queue.new channel_output = Array.new @output = {:output => channel_output} @mutex = Mutex.new # Provide for parameterization for a node = Hash.new [:output] = :all [:requires_input] = true # Initialize the state variables for a node @executing = false @in_transaction = false @continue_processing = true # Provide for transaction support @transactions_must_queue = true @transaction_queue = Array.new end |
Instance Attribute Details
#executing ⇒ Object (readonly)
The executing attribute specifies if a Node is executing.
173 174 175 |
# File 'lib/fbp.rb', line 173 def executing @executing end |
#options ⇒ Object (readonly)
The options attribute hold the IIP data for a Node.
176 177 178 |
# File 'lib/fbp.rb', line 176 def end |
#output ⇒ Object
The output attribute is an Array that holds all of the output channels for this Node
170 171 172 |
# File 'lib/fbp.rb', line 170 def output @output end |
Instance Method Details
#clean_option(key) ⇒ Object
:nodoc:
350 351 352 353 |
# File 'lib/fbp.rb', line 350 def clean_option(key) #:nodoc: return if key.nil? .delete key end |
#do_node_work(args) ⇒ Object
The do_node_work method is where the work of a Node is done. Each subclass of the Node object will need to override this method to implement the behavior of the node. The base behavior of this method is to write its incoming IPs to it single output channel
215 216 217 218 219 |
# File 'lib/fbp.rb', line 215 def do_node_work(args) write_to_output(args) return false if args.has_key?(:stop) true end |
#execute ⇒ Object
The execute method will start the execution of this node on one of the threads in the thread pool. It will block until data comes into the input queue unless the node has has the :requires_input option it is false. If the :requires_input option is set to false then the input queue will be checked but if there are no IPs to process in the queue an IP will created of the form => true and that will be sent to the node for processing.
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/fbp.rb', line 244 def execute return if @executing || !Fbp.has_pool @output.each do |key, channel| next if channel.nil? channel.each {|n| n.execute if !n.executing} end write_to_input({:start => true}) @executing = true Fbp.schedule do while @continue_processing @mutex.synchronize do if [:requires_input] @continue_processing = should_continue?(@input_queue.pop) else begin ip = @input_queue.pop(true) # non blocking rescue ip = nil end ip = ip.nil? ? {:continue => true} : ip @continue_processing = should_continue?(ip) end end end @executing = false @output.each_key {|channel| write_to_output({:completed => true}, channel)} end end |
#is_ready_to_run? ⇒ Boolean
The is_ready_to_run? method is used to ensure that a Node has received its required IIP before it is allowed to process incoming IPs. Each subclass of the Node class that needs an IIP before it can process IPs needs to override this method and check to see if all of the required options have been received. If all of the required options have been set then this method should return true otherwise it should return false. The default behavior of the Node class is to simply return true.
231 232 233 |
# File 'lib/fbp.rb', line 231 def is_ready_to_run?() true end |
#merge_options!(options) ⇒ Object
:nodoc:
341 342 343 |
# File 'lib/fbp.rb', line 341 def () #:nodoc: .merge!() end |
#register_for_output_from_node(node, output_channel = :output) ⇒ Object
The register_for_output_from_node method is how networks of nodes are created. A down stream node will register with an up stream node for the up stream’s node output on a specific output channel. The default output channel is the :output channel. Calling method will place the calling object into the array of nodes in the upstream node’s output channel. When the up stream node writes out it output, it will write it to all of the input queues of all of the down stream nodes that have registered for the output of the up steam node.
321 322 323 324 325 326 327 328 329 |
# File 'lib/fbp.rb', line 321 def register_for_output_from_node(node, output_channel = :output) return if node.nil? channel = node.output[output_channel] if channel.nil? channel = Array.new node.output[output_channel] = channel end node.output[output_channel] << self if !node.output[output_channel].include? self end |
#set_option(key, value) ⇒ Object
:nodoc:
345 346 347 348 |
# File 'lib/fbp.rb', line 345 def set_option (key, value) #:nodoc: return if key.nil? [key] = value end |
#stop ⇒ Object
The stop method will send an IP to this node of the form => true. The default implementation would be to have the execution of the node stop though subclasses of the Node class could change that behavior
360 361 362 |
# File 'lib/fbp.rb', line 360 def stop write_to_input({:stop => true}) end |
#unregister_for_output_from_node(node, output_channel = :output) ⇒ Object
The unregister_for_output_from_node method will remove this node from an output queue of an up stream node.
335 336 337 338 339 |
# File 'lib/fbp.rb', line 335 def unregister_for_output_from_node(node, output_channel = :output) return if node.nil? channel = node.output[output_channel] node.output[output_channel] = node.output[output_channel] - [self] if !channel.nil? end |
#wait_until_completed ⇒ Object
The wait_until_completed provides a way to wait until a node has completed its work. This is needed as all of the work of the nodes in a network of nodes is done asynchronously. Typically this is called on the last node in a network of nodes to ensure that all processing has completed. This method works by waiting on the output_queue which is only written to when all work of the node has completed. The Queue instance will cause the calling thread to block until an IP has been placed into the output queue.
374 375 376 |
# File 'lib/fbp.rb', line 374 def wait_until_completed @output_queue.pop end |
#write_to_input(obj) ⇒ Object
The write_to_input method will push an IP onto the input queue of a Node.
278 279 280 |
# File 'lib/fbp.rb', line 278 def write_to_input(obj) @input_queue << obj if !obj.nil? end |
#write_to_output(result, output_channel = :all) ⇒ Object
The write_to_output will write an IP into the input queue of all of the nodes in the specified output channel. The default output channel is the :all channel which write the IP to every channel.
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/fbp.rb', line 287 def write_to_output(result, output_channel = :all) return if result.nil? @output_queue << result if result.has_key? :completed # Get the channels that will be written to channels = nil if :all == output_channel channels = @output.keys else channels = [output_channel] end # With the channel set, iterate each node and write to # that nodes output channels.each do |channel_key| next if channel_key.nil? channel_array = @output[channel_key] next if channel_array.nil? channel_array.each do |a_node| next if a_node.nil? a_node.write_to_input(result) end end end |