Class: RFlow::Component
- Inherits:
-
Object
- Object
- RFlow::Component
- Defined in:
- lib/rflow/component.rb,
lib/rflow/component/port.rb
Overview
Parent class for all RFlow components.
Direct Known Subclasses
RFlow::Components::Clock, RFlow::Components::GenerateIntegerSequence, RFlow::Components::Replicate, RFlow::Components::RubyProcFilter
Defined Under Namespace
Classes: HashPort, HashSubPort, InputPort, OutputPort, Port, PortCollection
Instance Attribute Summary collapse
-
#name ⇒ String
The name of the component.
-
#ports ⇒ PortCollection
readonly
Collection of the component’s input and output ports.
-
#shard ⇒ Shard
readonly
Reference to the component’s worker process’s Shard.
-
#uuid ⇒ String
The UUID of the component.
-
#worker ⇒ Shard::Worker
readonly
Reference to the worker process in which this instance of the component is running.
Class Method Summary collapse
-
.build(worker, config) ⇒ RFlow::Component
Attempt to instantiate a component described by the config specification.
-
.input_port(name) ⇒ void
When declaring your component class, defines an input port with a given name.
-
.output_port(name) ⇒ void
When declaring your component class, defines an output port with a given name.
Instance Method Summary collapse
-
#cleanup! ⇒ void
Method called after all components have been shutdown! and just before the global RFlow exit.
-
#configure!(deserialized_configuration) ⇒ void
Method that should be overridden by a subclass to provide for component-specific configuration.
-
#initialize(args = {}) ⇒ Component
constructor
A new instance of Component.
-
#input_ports ⇒ Array<InputPort>
Returns a list of connected input ports.
-
#output_ports ⇒ Array<OutputPort>
Returns a list of connected output ports.
-
#process_message(input_port, input_port_key, connection, message) ⇒ void
Method called when a message is received on an input port.
-
#run! ⇒ void
Main component running method.
-
#shutdown! ⇒ void
Method called when RFlow is shutting down.
-
#to_s ⇒ String
Pretty-printed version of the component, its ports, their keys, and their connections.
Constructor Details
#initialize(args = {}) ⇒ Component
Returns a new instance of Component.
110 111 112 113 114 115 116 117 118 |
# File 'lib/rflow/component.rb', line 110 def initialize(args = {}) @name = args[:name] @uuid = args[:uuid] @worker = args[:worker] @ports = PortCollection.new self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) } self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) } end |
Instance Attribute Details
#name ⇒ String
The name of the component.
101 102 103 |
# File 'lib/rflow/component.rb', line 101 def name @name end |
#ports ⇒ PortCollection (readonly)
Collection of the component’s input and output ports.
104 105 106 |
# File 'lib/rflow/component.rb', line 104 def ports @ports end |
#shard ⇒ Shard (readonly)
Reference to the component’s worker process’s Shard.
123 |
# File 'lib/rflow/component.rb', line 123 def shard; worker.shard if worker; end |
#uuid ⇒ String
The UUID of the component.
98 99 100 |
# File 'lib/rflow/component.rb', line 98 def uuid @uuid end |
#worker ⇒ Shard::Worker (readonly)
Reference to the worker process in which this instance of the component is running.
107 108 109 |
# File 'lib/rflow/component.rb', line 107 def worker @worker end |
Class Method Details
.build(worker, config) ⇒ RFlow::Component
Attempt to instantiate a component described by the config specification. This assumes that the specification of a component is a fully qualified Ruby class that has already been loaded. It will first attempt to find subclasses of RFlow::Component (in RFlow::Configuration#available_components) and then attempt to constantize the specification into a different class. Future releases will support external (i.e. non-managed components), but the current stuff only supports Ruby classes.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/rflow/component.rb', line 58 def build(worker, config) raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed? RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})" begin component_class = RFlow.configuration.available_components[config.specification] if component_class RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']" else RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'" component_class = config.specification.constantize end component_class.new(worker: worker, uuid: config.uuid, name: config.name).tap do |component| config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid } config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid } config.input_ports.each do |p| p.input_connections.each do |c| component.send(p.name.to_sym).add_connection c.input_port_key, Connection.build(c) end end config.output_ports.each do |p| p.output_connections.each do |c| component.send(p.name.to_sym).add_connection c.output_port_key, Connection.build(c) end end end rescue NameError => e raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' could not be loaded (#{e.message})" rescue Exception => e raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}" end end |
.input_port(name) ⇒ void
This method returns an undefined value.
When declaring your component class, defines an input port with a given name. Will also define a port accessor method named after the port for retrieving it.
21 |
# File 'lib/rflow/component.rb', line 21 def input_port(name); define_port(defined_input_ports, name); end |
.output_port(name) ⇒ void
This method returns an undefined value.
When declaring your component class, defines an output port with a given name. Will also define a port accessor method named after the port for retrieving it.
29 |
# File 'lib/rflow/component.rb', line 29 def output_port(name); define_port(defined_output_ports, name); end |
Instance Method Details
#cleanup! ⇒ void
This method returns an undefined value.
Method called after all components have been shutdown! and just before the global RFlow exit. Sublcasses should implement to cleanup any leftover state, e.g. flush file handles, etc.
217 |
# File 'lib/rflow/component.rb', line 217 def cleanup!; end |
#configure!(deserialized_configuration) ⇒ void
This method returns an undefined value.
Method that should be overridden by a subclass to provide for component-specific configuration. The subclass should use the RFlow.configuration attribute (+@configuration+) to store its particular configuration.
190 |
# File 'lib/rflow/component.rb', line 190 def configure!(deserialized_configuration); end |
#input_ports ⇒ Array<InputPort>
Returns a list of connected input ports. Each port will have one or more keys associated with a particular connection.
128 |
# File 'lib/rflow/component.rb', line 128 def input_ports; ports.by_type['RFlow::Component::InputPort']; end |
#output_ports ⇒ Array<OutputPort>
Returns a list of connected output ports. Each port will have one or more keys associated with the particular connection.
133 |
# File 'lib/rflow/component.rb', line 133 def output_ports; ports.by_type['RFlow::Component::OutputPort']; end |
#process_message(input_port, input_port_key, connection, message) ⇒ void
This method returns an undefined value.
Method called when a message is received on an input port. Subclasses should implement if they want to receive messages.
205 |
# File 'lib/rflow/component.rb', line 205 def (input_port, input_port_key, connection, ); end |
#run! ⇒ void
This method returns an undefined value.
Main component running method. Subclasses should implement if they want to set up any EventMachine stuffs (servers, clients, etc.).
196 |
# File 'lib/rflow/component.rb', line 196 def run!; end |
#shutdown! ⇒ void
This method returns an undefined value.
Method called when RFlow is shutting down. Subclasses should implement to terminate any servers/clients (or let them finish) and stop sending new data through the flow.
211 |
# File 'lib/rflow/component.rb', line 211 def shutdown!; end |
#to_s ⇒ String
Pretty-printed version of the component, its ports, their keys, and their connections.
172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/rflow/component.rb', line 172 def to_s string = "Component '#{name}' (#{uuid})\n" ports.each do |port| port.keys.each do |key| port[key].each do |connection| string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{key}' connection '#{connection.name}' (#{connection.uuid})\n" end end end string end |