Class: RFlow::Component

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/component.rb,
lib/rflow/component/port.rb

Overview

Parent class for all RFlow components.

Defined Under Namespace

Classes: HashPort, HashSubPort, InputPort, OutputPort, Port, PortCollection

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Component

Returns a new instance of Component.

Parameters:

  • args (Hash) (defaults to: {})

    supported args are :name, :uuid, :worker



110
111
112
113
114
115
116
117
118
# File 'lib/rflow/component.rb', line 110

def initialize(args = {})
  @name = args[:name]
  @uuid = args[:uuid]
  @worker = args[:worker]
  @ports = PortCollection.new

  self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) }
  self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) }
end

Instance Attribute Details

#nameString

The name of the component.

Returns:

  • (String)


101
102
103
# File 'lib/rflow/component.rb', line 101

def name
  @name
end

#portsPortCollection (readonly)

Collection of the component’s input and output ports.

Returns:



104
105
106
# File 'lib/rflow/component.rb', line 104

def ports
  @ports
end

#shardShard (readonly)

Reference to the component’s worker process’s Shard.

Returns:



123
# File 'lib/rflow/component.rb', line 123

def shard; worker.shard if worker; end

#uuidString

The UUID of the component.

Returns:

  • (String)


98
99
100
# File 'lib/rflow/component.rb', line 98

def uuid
  @uuid
end

#workerShard::Worker (readonly)

Reference to the worker process in which this instance of the component is running.

Returns:



107
108
109
# File 'lib/rflow/component.rb', line 107

def worker
  @worker
end

Class Method Details

.build(worker, config) ⇒ RFlow::Component

Attempt to instantiate a component described by the config specification. This assumes that the specification of a component is a fully qualified Ruby class that has already been loaded. It will first attempt to find subclasses of RFlow::Component (in RFlow::Configuration#available_components) and then attempt to constantize the specification into a different class. Future releases will support external (i.e. non-managed components), but the current stuff only supports Ruby classes.

Parameters:

Returns:

Raises:

  • (NotImplementedError)


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/rflow/component.rb', line 58

def build(worker, config)
  raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed?

  RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})"
  begin
    component_class = RFlow.configuration.available_components[config.specification]

    if component_class
      RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']"
    else
      RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'"
      component_class = config.specification.constantize
    end

    component_class.new(worker: worker, uuid: config.uuid, name: config.name).tap do |component|
      config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid }
      config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid }

      config.input_ports.each do |p|
        p.input_connections.each do |c|
          component.send(p.name.to_sym).add_connection c.input_port_key, Connection.build(c)
        end
      end

      config.output_ports.each do |p|
        p.output_connections.each do |c|
          component.send(p.name.to_sym).add_connection c.output_port_key, Connection.build(c)
        end
      end
    end
  rescue NameError => e
    raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' could not be loaded (#{e.message})"
  rescue Exception => e
    raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}"
  end
end

.input_port(name) ⇒ void

This method returns an undefined value.

When declaring your component class, defines an input port with a given name. Will also define a port accessor method named after the port for retrieving it.

Parameters:

  • name (String)


21
# File 'lib/rflow/component.rb', line 21

def input_port(name); define_port(defined_input_ports, name); end

.output_port(name) ⇒ void

This method returns an undefined value.

When declaring your component class, defines an output port with a given name. Will also define a port accessor method named after the port for retrieving it.

Parameters:

  • name (String)


29
# File 'lib/rflow/component.rb', line 29

def output_port(name); define_port(defined_output_ports, name); end

Instance Method Details

#cleanup!void

This method returns an undefined value.

Method called after all components have been shutdown! and just before the global RFlow exit. Sublcasses should implement to cleanup any leftover state, e.g. flush file handles, etc.



217
# File 'lib/rflow/component.rb', line 217

def cleanup!; end

#configure!(deserialized_configuration) ⇒ void

This method returns an undefined value.

Method that should be overridden by a subclass to provide for component-specific configuration. The subclass should use the RFlow.configuration attribute (+@configuration+) to store its particular configuration.

Parameters:

  • deserialized_configuration (Hash)

    from the RFlow configuration database; most likely a Hash. Don’t assume that the keys are symbols!



190
# File 'lib/rflow/component.rb', line 190

def configure!(deserialized_configuration); end

#input_portsArray<InputPort>

Returns a list of connected input ports. Each port will have one or more keys associated with a particular connection.

Returns:



128
# File 'lib/rflow/component.rb', line 128

def input_ports; ports.by_type['RFlow::Component::InputPort']; end

#output_portsArray<OutputPort>

Returns a list of connected output ports. Each port will have one or more keys associated with the particular connection.

Returns:



133
# File 'lib/rflow/component.rb', line 133

def output_ports; ports.by_type['RFlow::Component::OutputPort']; end

#process_message(input_port, input_port_key, connection, message) ⇒ void

This method returns an undefined value.

Method called when a message is received on an input port. Subclasses should implement if they want to receive messages.

Parameters:

  • input_port (RFlow::Component::InputPort)

    the input port the message was received on

  • input_port_key (String)

    if the message was received on a keyed subport, this is the key

  • connection (RFlow::Connection)

    the connection the message was received on

  • message (RFlow::Message)

    the message itself



205
# File 'lib/rflow/component.rb', line 205

def process_message(input_port, input_port_key, connection, message); end

#run!void

This method returns an undefined value.

Main component running method. Subclasses should implement if they want to set up any EventMachine stuffs (servers, clients, etc.).



196
# File 'lib/rflow/component.rb', line 196

def run!; end

#shutdown!void

This method returns an undefined value.

Method called when RFlow is shutting down. Subclasses should implement to terminate any servers/clients (or let them finish) and stop sending new data through the flow.



211
# File 'lib/rflow/component.rb', line 211

def shutdown!; end

#to_sString

Pretty-printed version of the component, its ports, their keys, and their connections.

Returns:

  • (String)


172
173
174
175
176
177
178
179
180
181
182
# File 'lib/rflow/component.rb', line 172

def to_s
  string = "Component '#{name}' (#{uuid})\n"
  ports.each do |port|
    port.keys.each do |key|
      port[key].each do |connection|
        string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{key}' connection '#{connection.name}' (#{connection.uuid})\n"
      end
    end
  end
  string
end