Class: RFlow::Component

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/component.rb,
lib/rflow/component/port.rb

Defined Under Namespace

Modules: ConnectionCollection Classes: HashPort, HashSubPort, InputPort, OutputPort, Port, PortCollection

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Component

Returns a new instance of Component.



80
81
82
83
84
85
86
87
88
# File 'lib/rflow/component.rb', line 80

def initialize(args = {})
  @name = args[:name]
  @uuid = args[:uuid]
  @worker = args[:worker]
  @ports = PortCollection.new

  self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) }
  self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) }
end

Instance Attribute Details

#nameObject

Returns the value of attribute name.



77
78
79
# File 'lib/rflow/component.rb', line 77

def name
  @name
end

#portsObject (readonly)

Returns the value of attribute ports.



78
79
80
# File 'lib/rflow/component.rb', line 78

def ports
  @ports
end

#uuidObject

Returns the value of attribute uuid.



77
78
79
# File 'lib/rflow/component.rb', line 77

def uuid
  @uuid
end

#workerObject (readonly)

Returns the value of attribute worker.



78
79
80
# File 'lib/rflow/component.rb', line 78

def worker
  @worker
end

Class Method Details

.build(worker, config) ⇒ Object

Attempt to instantiate a component described by the config specification. This assumes that the specification of a component is a fully qualified Ruby class that has already been loaded. It will first attempt to find subclasses of RFlow::Component (in the available_components hash) and then attempt to constantize the specification into a different class. Future releases will support external (i.e. non-managed components), but the current stuff only supports Ruby classes

Raises:

  • (NotImplementedError)


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rflow/component.rb', line 39

def build(worker, config)
  raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed?

  RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
  begin
    component_class = RFlow.configuration.available_components[config.specification]

    if component_class
      RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']"
    else
      RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'"
      component_class = config.specification.constantize
    end

    component_class.new(worker: worker, uuid: config.uuid, name: config.name).tap do |component|
      config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid }
      config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid }

      config.input_ports.each do |p|
        p.input_connections.each do |c|
          component.send(p.name.to_sym).add_connection c.input_port_key, Connection.build(c)
        end
      end

      config.output_ports.each do |p|
        p.output_connections.each do |c|
          component.send(p.name.to_sym).add_connection c.output_port_key, Connection.build(c)
        end
      end
    end
  rescue NameError => e
    raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' could not be loaded (#{e.message})"
  rescue Exception => e
    raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}"
  end
end

.define_port(collection, name) ⇒ Object



22
23
24
25
26
27
28
29
# File 'lib/rflow/component.rb', line 22

def define_port(collection, name)
  collection[name.to_s] = true

  # Create the port accessor method based on the port name
  define_method name.to_s.to_sym do
    ports.by_name[name.to_s]
  end
end

.defined_input_portsObject



19
# File 'lib/rflow/component.rb', line 19

def defined_input_ports; @defined_input_ports ||= {}; end

.defined_output_portsObject



20
# File 'lib/rflow/component.rb', line 20

def defined_output_ports; @defined_output_ports ||= {}; end

.inherited(subclass) ⇒ Object

Keep track of available component subclasses



9
10
11
# File 'lib/rflow/component.rb', line 9

def inherited(subclass)
  RFlow::Configuration.add_available_component(subclass)
end

.input_port(name) ⇒ Object

Define an input port with a given name



14
# File 'lib/rflow/component.rb', line 14

def input_port(name); define_port(defined_input_ports, name); end

.output_port(name) ⇒ Object

Define an output port with a given name



17
# File 'lib/rflow/component.rb', line 17

def output_port(name); define_port(defined_output_ports, name); end

Instance Method Details

#cleanup!Object

Method called after all components have been shutdown! and just before the global RFlow exit. Sublcasses should implement to cleanup any leftover state, e.g. flush file handles, etc



167
# File 'lib/rflow/component.rb', line 167

def cleanup!; end

#configure!(deserialized_configuration) ⇒ Object

Method that should be overridden by a subclass to provide for component-specific configuration. The subclass should use the self.configuration attribute (@configuration) to store its particular configuration. The incoming deserialized_configuration parameter is from the RFlow configuration database and is (most likely) a hash. Don’t assume that the keys are symbols



148
# File 'lib/rflow/component.rb', line 148

def configure!(deserialized_configuration); end

#configure_input_port!(port_name, options = {}) ⇒ Object



100
101
102
103
104
105
106
# File 'lib/rflow/component.rb', line 100

def configure_input_port!(port_name, options = {})
  RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) input port '#{port_name}' (#{options[:uuid]})"
  unless self.class.defined_input_ports.include? port_name
    raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'"
  end
  ports.by_name[port_name].uuid = options[:uuid]
end

#configure_output_port!(port_name, options = {}) ⇒ Object



108
109
110
111
112
113
114
# File 'lib/rflow/component.rb', line 108

def configure_output_port!(port_name, options = {})
  RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) output port '#{port_name}' (#{options[:uuid]})"
  unless self.class.defined_output_ports.include? port_name
    raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'"
  end
  ports.by_name[port_name].uuid = options[:uuid]
end

#connect_inputs!Object

Tell the component to establish its ports’ connections, i.e. make the connection. Uses the underlying connection object. Also establishes the callbacks for each of the input ports



119
120
121
122
# File 'lib/rflow/component.rb', line 119

def connect_inputs!
  input_ports.each {|port| port.recv_callback = method(:process_message) }
  input_ports.each(&:connect!)
end

#connect_outputs!Object

Tell the component to establish its ports’ connections, i.e. make the connection. Uses the underlying connection object.



126
127
128
# File 'lib/rflow/component.rb', line 126

def connect_outputs!
  output_ports.each(&:connect!)
end

#input_portsObject

Returns a list of connected input ports. Each port will have one or more keys associated with a particular connection.



94
# File 'lib/rflow/component.rb', line 94

def input_ports; ports.by_type['RFlow::Component::InputPort']; end

#output_portsObject

Returns a list of connected output ports. Each port will have one or more keys associated with the particular connection.



98
# File 'lib/rflow/component.rb', line 98

def output_ports; ports.by_type['RFlow::Component::OutputPort']; end

#process_message(input_port, input_port_key, connection, message) ⇒ Object

Method called when a message is received on an input port. Subclasses should implement if they want to receive messages



157
# File 'lib/rflow/component.rb', line 157

def process_message(input_port, input_port_key, connection, message); end

#run!Object

Main component running method. Subclasses should implement if they want to set up any EventMachine stuffs (servers, clients, etc)



153
# File 'lib/rflow/component.rb', line 153

def run!; end

#shardObject



90
# File 'lib/rflow/component.rb', line 90

def shard; worker.shard if worker; end

#shutdown!Object

Method called when RFlow is shutting down. Subclasses should implment to terminate any servers/clients (or let them finish) and stop sending new data through the flow



162
# File 'lib/rflow/component.rb', line 162

def shutdown!; end

#to_sObject



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/rflow/component.rb', line 130

def to_s
  string = "Component '#{name}' (#{uuid})\n"
  ports.each do |port|
    port.keys.each do |key|
      port[key].each do |connection|
        string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{key}' connection '#{connection.name}' (#{connection.uuid})\n"
      end
    end
  end
  string
end