Class: RFlow::Component
- Inherits:
-
Object
- Object
- RFlow::Component
- Defined in:
- lib/rflow/component.rb,
lib/rflow/component/port.rb
Direct Known Subclasses
RFlow::Components::Clock, RFlow::Components::GenerateIntegerSequence, RFlow::Components::Replicate, RFlow::Components::RubyProcFilter
Defined Under Namespace
Modules: ConnectionCollection Classes: HashPort, HashSubPort, InputPort, OutputPort, Port, PortCollection
Instance Attribute Summary collapse
-
#name ⇒ Object
Returns the value of attribute name.
-
#ports ⇒ Object
readonly
Returns the value of attribute ports.
-
#uuid ⇒ Object
Returns the value of attribute uuid.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.build(worker, config) ⇒ Object
Attempt to instantiate a component described by the config specification.
- .define_port(collection, name) ⇒ Object
- .defined_input_ports ⇒ Object
- .defined_output_ports ⇒ Object
-
.inherited(subclass) ⇒ Object
Keep track of available component subclasses.
-
.input_port(name) ⇒ Object
Define an input port with a given name.
-
.output_port(name) ⇒ Object
Define an output port with a given name.
Instance Method Summary collapse
-
#cleanup! ⇒ Object
Method called after all components have been shutdown! and just before the global RFlow exit.
-
#configure!(deserialized_configuration) ⇒ Object
Method that should be overridden by a subclass to provide for component-specific configuration.
- #configure_input_port!(port_name, options = {}) ⇒ Object
- #configure_output_port!(port_name, options = {}) ⇒ Object
-
#connect_inputs! ⇒ Object
Tell the component to establish its ports’ connections, i.e.
-
#connect_outputs! ⇒ Object
Tell the component to establish its ports’ connections, i.e.
-
#initialize(args = {}) ⇒ Component
constructor
A new instance of Component.
-
#input_ports ⇒ Object
Returns a list of connected input ports.
-
#output_ports ⇒ Object
Returns a list of connected output ports.
-
#process_message(input_port, input_port_key, connection, message) ⇒ Object
Method called when a message is received on an input port.
-
#run! ⇒ Object
Main component running method.
- #shard ⇒ Object
-
#shutdown! ⇒ Object
Method called when RFlow is shutting down.
- #to_s ⇒ Object
Constructor Details
#initialize(args = {}) ⇒ Component
Returns a new instance of Component.
80 81 82 83 84 85 86 87 88 |
# File 'lib/rflow/component.rb', line 80 def initialize(args = {}) @name = args[:name] @uuid = args[:uuid] @worker = args[:worker] @ports = PortCollection.new self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) } self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) } end |
Instance Attribute Details
#name ⇒ Object
Returns the value of attribute name.
77 78 79 |
# File 'lib/rflow/component.rb', line 77 def name @name end |
#ports ⇒ Object (readonly)
Returns the value of attribute ports.
78 79 80 |
# File 'lib/rflow/component.rb', line 78 def ports @ports end |
#uuid ⇒ Object
Returns the value of attribute uuid.
77 78 79 |
# File 'lib/rflow/component.rb', line 77 def uuid @uuid end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
78 79 80 |
# File 'lib/rflow/component.rb', line 78 def worker @worker end |
Class Method Details
.build(worker, config) ⇒ Object
Attempt to instantiate a component described by the config specification. This assumes that the specification of a component is a fully qualified Ruby class that has already been loaded. It will first attempt to find subclasses of RFlow::Component (in the available_components hash) and then attempt to constantize the specification into a different class. Future releases will support external (i.e. non-managed components), but the current stuff only supports Ruby classes
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rflow/component.rb', line 39 def build(worker, config) raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed? RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})" begin component_class = RFlow.configuration.available_components[config.specification] if component_class RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']" else RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'" component_class = config.specification.constantize end component_class.new(worker: worker, uuid: config.uuid, name: config.name).tap do |component| config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid } config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid } config.input_ports.each do |p| p.input_connections.each do |c| component.send(p.name.to_sym).add_connection c.input_port_key, Connection.build(c) end end config.output_ports.each do |p| p.output_connections.each do |c| component.send(p.name.to_sym).add_connection c.output_port_key, Connection.build(c) end end end rescue NameError => e raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' could not be loaded (#{e.})" rescue Exception => e raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.}, because: #{e.backtrace.inspect}" end end |
.define_port(collection, name) ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/rflow/component.rb', line 22 def define_port(collection, name) collection[name.to_s] = true # Create the port accessor method based on the port name define_method name.to_s.to_sym do ports.by_name[name.to_s] end end |
.defined_input_ports ⇒ Object
19 |
# File 'lib/rflow/component.rb', line 19 def defined_input_ports; @defined_input_ports ||= {}; end |
.defined_output_ports ⇒ Object
20 |
# File 'lib/rflow/component.rb', line 20 def defined_output_ports; @defined_output_ports ||= {}; end |
.inherited(subclass) ⇒ Object
Keep track of available component subclasses
9 10 11 |
# File 'lib/rflow/component.rb', line 9 def inherited(subclass) RFlow::Configuration.add_available_component(subclass) end |
.input_port(name) ⇒ Object
Define an input port with a given name
14 |
# File 'lib/rflow/component.rb', line 14 def input_port(name); define_port(defined_input_ports, name); end |
.output_port(name) ⇒ Object
Define an output port with a given name
17 |
# File 'lib/rflow/component.rb', line 17 def output_port(name); define_port(defined_output_ports, name); end |
Instance Method Details
#cleanup! ⇒ Object
Method called after all components have been shutdown! and just before the global RFlow exit. Sublcasses should implement to cleanup any leftover state, e.g. flush file handles, etc
167 |
# File 'lib/rflow/component.rb', line 167 def cleanup!; end |
#configure!(deserialized_configuration) ⇒ Object
Method that should be overridden by a subclass to provide for component-specific configuration. The subclass should use the self.configuration attribute (@configuration) to store its particular configuration. The incoming deserialized_configuration parameter is from the RFlow configuration database and is (most likely) a hash. Don’t assume that the keys are symbols
148 |
# File 'lib/rflow/component.rb', line 148 def configure!(deserialized_configuration); end |
#configure_input_port!(port_name, options = {}) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/rflow/component.rb', line 100 def configure_input_port!(port_name, = {}) RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) input port '#{port_name}' (#{[:uuid]})" unless self.class.defined_input_ports.include? port_name raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'" end ports.by_name[port_name].uuid = [:uuid] end |
#configure_output_port!(port_name, options = {}) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/rflow/component.rb', line 108 def configure_output_port!(port_name, = {}) RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) output port '#{port_name}' (#{[:uuid]})" unless self.class.defined_output_ports.include? port_name raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'" end ports.by_name[port_name].uuid = [:uuid] end |
#connect_inputs! ⇒ Object
Tell the component to establish its ports’ connections, i.e. make the connection. Uses the underlying connection object. Also establishes the callbacks for each of the input ports
119 120 121 122 |
# File 'lib/rflow/component.rb', line 119 def connect_inputs! input_ports.each {|port| port.recv_callback = method(:process_message) } input_ports.each(&:connect!) end |
#connect_outputs! ⇒ Object
Tell the component to establish its ports’ connections, i.e. make the connection. Uses the underlying connection object.
126 127 128 |
# File 'lib/rflow/component.rb', line 126 def connect_outputs! output_ports.each(&:connect!) end |
#input_ports ⇒ Object
Returns a list of connected input ports. Each port will have one or more keys associated with a particular connection.
94 |
# File 'lib/rflow/component.rb', line 94 def input_ports; ports.by_type['RFlow::Component::InputPort']; end |
#output_ports ⇒ Object
Returns a list of connected output ports. Each port will have one or more keys associated with the particular connection.
98 |
# File 'lib/rflow/component.rb', line 98 def output_ports; ports.by_type['RFlow::Component::OutputPort']; end |
#process_message(input_port, input_port_key, connection, message) ⇒ Object
Method called when a message is received on an input port. Subclasses should implement if they want to receive messages
157 |
# File 'lib/rflow/component.rb', line 157 def (input_port, input_port_key, connection, ); end |
#run! ⇒ Object
Main component running method. Subclasses should implement if they want to set up any EventMachine stuffs (servers, clients, etc)
153 |
# File 'lib/rflow/component.rb', line 153 def run!; end |
#shard ⇒ Object
90 |
# File 'lib/rflow/component.rb', line 90 def shard; worker.shard if worker; end |
#shutdown! ⇒ Object
Method called when RFlow is shutting down. Subclasses should implment to terminate any servers/clients (or let them finish) and stop sending new data through the flow
162 |
# File 'lib/rflow/component.rb', line 162 def shutdown!; end |
#to_s ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/rflow/component.rb', line 130 def to_s string = "Component '#{name}' (#{uuid})\n" ports.each do |port| port.keys.each do |key| port[key].each do |connection| string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{key}' connection '#{connection.name}' (#{connection.uuid})\n" end end end string end |