Class: Fbp::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/fbp.rb

Overview

Description

The Node class defines the base class for all Ruby Fbp Nodes. It defines the basic operations of all Nodes.

Discussion

When creating a new Node subclass is made it will need to at least override the do_node_work method. If the subclass requires an IIP the subclass will also need to override the is_ready_to_run? method.

The Node class basic behavior is to write its incoming IPs to its single output channel.

Node object life cycle

A node is created it is quiescent. This is necessary as most node need other nodes in a network of node before it is useful. Once all of the nodes have been created that are needed for an application, they need to be placed into a network using the register_for_output_from_node method to hve the output of one node become the input to another node. Once the network has been made. The first node in the network needs to be executed. Calling the execute method on the first node will ensure that all nodes in the network will also be executed. When a node is execute, it will be sent an IP => true. This means that when a new node subclass is made it needs to know that it will receive this IP in its def do_node_work method. Once a node is executing, it will block until an IP is push into the input queue of the node unless the node has the : :requires_input option is set to false. If that is the case the node will not block but will send an IP to the node of the form => true for the node to process. The ability not to block is useful for nodes like the Test_file_reader_node node that reads input from a file and creates IPs for down stream nodes to process.

Each subclass of the Node class must override the do_node_work method. If the node has not completed its work then the do_node_work method should return true. If the node has completed its work it should return false. When a node has completed its work, it will be sent an IP of => true. This :completed IP is handled in a special way. It will push this IP into output_queue attribute of a Node object. This allows the wait_until_completed to block until all of the work of a node is completed. The => true IP is also sent to every output channel for a node telling all of the down stream nodes that its upstream node has finished.

Example Usage

# Need to require the Fbp gem
require 'Fbp'
# First Set the number of threads that should be used for this solution
Fbp::num_threads = 5
# Make the thread pool that will be used to run the nodes in the application
Fbp::make_pool
# Make the nodes needed for the application
read_node = Fbp::Test_file_reader_node.new(File.expand_path('~/input.txt'))
write_node = Fbp::Text_file_writer_node.new(File.expand_path('~/output.txt'))
# Hook up the nodes into the network needed for the application
write_node.register_for_output_from_node(read_node)
# Execute the first node in the network
read_node.execute
# Wait for the network to complete its work by checking to see if the last node
# in the network has completed
write_node.wait_until_completed
# With the work completed shutdown the thread pool
Fbp::shutdown

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeNode

:nodoc:



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/fbp.rb', line 184

def initialize() #:nodoc:
  # Initialize the input and out data types
  @input_queue = Queue.new
  @output_queue = Queue.new
  
  channel_output = Array.new
  @output = {:output => channel_output}
  
  @mutex = Mutex.new

  # Provide for parameterization for a node
    @options = Hash.new
  @options[:output] = :all
    @options[:requires_input] = true

  # Initialize the state variables for a node
  @executing = false
  @in_transaction = false
  @continue_processing = true

  # Provide for transaction support
  @transactions_must_queue = true
  @transaction_queue = Array.new
end

Instance Attribute Details

#executingObject (readonly)

The executing attribute specifies if a Node is executing.



173
174
175
# File 'lib/fbp.rb', line 173

def executing
  @executing
end

#optionsObject (readonly)

The options attribute hold the IIP data for a Node.



176
177
178
# File 'lib/fbp.rb', line 176

def options
  @options
end

#outputObject

The output attribute is an Array that holds all of the output channels for this Node



170
171
172
# File 'lib/fbp.rb', line 170

def output
  @output
end

Instance Method Details

#clean_option(key) ⇒ Object

:nodoc:



350
351
352
353
# File 'lib/fbp.rb', line 350

def clean_option(key) #:nodoc:
  return if key.nil?
  @options.delete key
end

#do_node_work(args) ⇒ Object

The do_node_work method is where the work of a Node is done. Each subclass of the Node object will need to override this method to implement the behavior of the node. The base behavior of this method is to write its incoming IPs to it single output channel



215
216
217
218
219
# File 'lib/fbp.rb', line 215

def do_node_work(args)
write_to_output(args)
    return false if args.has_key?(:stop)
true
end

#executeObject

The execute method will start the execution of this node on one of the threads in the thread pool. It will block until data comes into the input queue unless the node has has the :requires_input option it is false. If the :requires_input option is set to false then the input queue will be checked but if there are no IPs to process in the queue an IP will created of the form => true and that will be sent to the node for processing.



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/fbp.rb', line 244

def execute
  return if @executing || !Fbp.has_pool
  
  @output.each do |key, channel|
    next if channel.nil?
      channel.each  {|n| n.execute if !n.executing}
  end
  
  write_to_input({:start => true})
  @executing = true
  Fbp.schedule do
    while @continue_processing
      @mutex.synchronize do
          if  @options[:requires_input]
            @continue_processing = should_continue?(@input_queue.pop)
          else
            begin
              ip =  @input_queue.pop(true) # non blocking
            rescue
              ip = nil
            end
            ip = ip.nil? ? {:continue => true} : ip
            @continue_processing = should_continue?(ip)
          end
        end
      end
    @executing = false
    @output.each_key {|channel| write_to_output({:completed => true}, channel)}
  end
end

#is_ready_to_run?Boolean

The is_ready_to_run? method is used to ensure that a Node has received its required IIP before it is allowed to process incoming IPs. Each subclass of the Node class that needs an IIP before it can process IPs needs to override this method and check to see if all of the required options have been received. If all of the required options have been set then this method should return true otherwise it should return false. The default behavior of the Node class is to simply return true.

Returns:

  • (Boolean)


231
232
233
# File 'lib/fbp.rb', line 231

def is_ready_to_run?()
  true
end

#merge_options!(options) ⇒ Object

:nodoc:



341
342
343
# File 'lib/fbp.rb', line 341

def merge_options!(options) #:nodoc:
  @options.merge!(options)
end

#register_for_output_from_node(node, output_channel = :output) ⇒ Object

The register_for_output_from_node method is how networks of nodes are created. A down stream node will register with an up stream node for the up stream’s node output on a specific output channel. The default output channel is the :output channel. Calling method will place the calling object into the array of nodes in the upstream node’s output channel. When the up stream node writes out it output, it will write it to all of the input queues of all of the down stream nodes that have registered for the output of the up steam node.



321
322
323
324
325
326
327
328
329
# File 'lib/fbp.rb', line 321

def register_for_output_from_node(node, output_channel = :output)
  return if node.nil?
  channel = node.output[output_channel]
  if channel.nil?
    channel = Array.new
    node.output[output_channel] = channel
  end
  node.output[output_channel] << self if !node.output[output_channel].include? self
end

#set_option(key, value) ⇒ Object

:nodoc:



345
346
347
348
# File 'lib/fbp.rb', line 345

def set_option (key, value) #:nodoc:
  return if key.nil?
  @options[key] = value
end

#stopObject

The stop method will send an IP to this node of the form => true. The default implementation would be to have the execution of the node stop though subclasses of the Node class could change that behavior



360
361
362
# File 'lib/fbp.rb', line 360

def stop
  write_to_input({:stop => true})
end

#unregister_for_output_from_node(node, output_channel = :output) ⇒ Object

The unregister_for_output_from_node method will remove this node from an output queue of an up stream node.



335
336
337
338
339
# File 'lib/fbp.rb', line 335

def unregister_for_output_from_node(node, output_channel = :output)
  return if node.nil?
  channel = node.output[output_channel]
  node.output[output_channel] = node.output[output_channel] - [self] if !channel.nil?
end

#wait_until_completedObject

The wait_until_completed provides a way to wait until a node has completed its work. This is needed as all of the work of the nodes in a network of nodes is done asynchronously. Typically this is called on the last node in a network of nodes to ensure that all processing has completed. This method works by waiting on the output_queue which is only written to when all work of the node has completed. The Queue instance will cause the calling thread to block until an IP has been placed into the output queue.



374
375
376
# File 'lib/fbp.rb', line 374

def wait_until_completed
  @output_queue.pop
end

#write_to_input(obj) ⇒ Object

The write_to_input method will push an IP onto the input queue of a Node.



278
279
280
# File 'lib/fbp.rb', line 278

def write_to_input(obj)
  @input_queue << obj if !obj.nil?
end

#write_to_output(result, output_channel = :all) ⇒ Object

The write_to_output will write an IP into the input queue of all of the nodes in the specified output channel. The default output channel is the :all channel which write the IP to every channel.



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/fbp.rb', line 287

def write_to_output(result, output_channel = :all)
  return if result.nil?
  @output_queue << result if result.has_key? :completed
  
  # Get the channels that will be written to
  channels = nil
  if :all == output_channel
    channels = @output.keys
  else
    channels = [output_channel]
  end
  
  # With the channel set, iterate each node and write to 
  # that nodes output
  channels.each do |channel_key|
    next if channel_key.nil?
      channel_array = @output[channel_key]
      next if channel_array.nil?
    channel_array.each do |a_node|
      next if a_node.nil?
      a_node.write_to_input(result)
    end
  end
  
end