Class: RFlow::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/configuration.rb,
lib/rflow/configuration/port.rb,
lib/rflow/configuration/shard.rb,
lib/rflow/configuration/setting.rb,
lib/rflow/configuration/component.rb,
lib/rflow/configuration/connection.rb,
lib/rflow/configuration/uuid_keyed.rb

Overview

Contains all the configuration data and methods for RFlow. Interacts directly with underlying SQLite database, and keeps a registry of available data types, extensions, and components. Also includes an external DSL, RubyDSL, that can be used in crafting config-like files that load the database.

Configuration provides a MVC-like framework for config files, where the models are the Setting, Component, Port, and Connection subclasses, the controllers are things like RubyDSL, and the views are defined relative to the controllers.

Defined Under Namespace

Classes: BrokeredZMQConnection, Component, ConfigurationItem, Connection, DataExtensionCollection, InputPort, OutputPort, Port, ProcessShard, Setting, Shard, ThreadShard, ZMQConnection, ZMQStreamer

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_path = nil) ⇒ Configuration

Returns a new instance of Configuration.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/rflow/configuration.rb', line 183

def initialize(database_path = nil)
  # If there is not a config DB path, assume that an AR
  # connection has already been established
  if database_path
    @database_path = database_path
    Configuration.establish_config_database_connection(database_path)
  end

  # Validate the connected database.
  # TODO: make this more complete, i.e. validate the various columns
  begin
    [Setting, Shard, Component, Port, Connection].each(&:first)
  rescue ActiveRecord::StatementInvalid => e
    raise ArgumentError, "Invalid schema in configuration database: #{e.message}"
  end
end

Class Method Details

.add_available_component(component) ⇒ void

This method returns an undefined value.

Used when RFlow::Component is subclassed to add another available component to the list.



108
109
110
111
112
113
# File 'lib/rflow/configuration.rb', line 108

def add_available_component(component)
  if available_components.include?(component.name)
    raise ArgumentError, "Component already '#{component.name}' already defined"
  end
  available_components[component.name] = component
end

.add_available_data_extension(data_type_name, extension) ⇒ void

This method returns an undefined value.

Add a data extension to the available_data_extensions class attribute. The extension parameter should be the name of a ruby module that will extend Message::Data to provide additional methods/capability. Naive, prefix-based inheritance is possible, see available_data_extensions or DataExtensionCollection.



97
98
99
100
101
102
103
# File 'lib/rflow/configuration.rb', line 97

def add_available_data_extension(data_type_name, extension)
  unless extension.is_a? Module
    raise ArgumentError, "Invalid data extension #{extension} for #{data_type_name}.  Only Ruby Modules allowed"
  end

  available_data_extensions.add data_type_name, extension
end

.add_available_data_type(name, serialization_type, schema) ⇒ void

This method returns an undefined value.

Add a schema to the available_data_types class attribute. Schema is indexed by name and serialization_type. avro is currently the only supported serialization_type.

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rflow/configuration.rb', line 78

def add_available_data_type(name, serialization_type, schema)
  # TODO: refactor each of these add_available_* into collections to
  # make DRYer.  Also figure out what to do with all to to_syms
  raise ArgumentError, "Data serialization_type must be 'avro' for '#{name}'" unless serialization_type == 'avro'

  if available_data_types[name.to_s].include? serialization_type.to_s
    raise ArgumentError, "Data type '#{name}' already defined for serialization_type '#{serialization_type}'"
  end

  available_data_types[name.to_s][serialization_type.to_s] = schema
end

.available_componentsHash

A Hash of defined components, usually automatically populated when a component subclasses RFlow::Component.

Returns:

  • (Hash)


70
71
72
# File 'lib/rflow/configuration.rb', line 70

def available_components
  @available_components ||= {}
end

.available_data_extensionsDataExtensionCollection

A DataExtensionCollection to hold available extensions that will be applied to the de-serialized data types.



63
64
65
# File 'lib/rflow/configuration.rb', line 63

def available_data_extensions
  @available_data_extensions ||= DataExtensionCollection.new
end

.available_data_typesHash

A collection of data types (schemas) indexed by their name and their schema type (‘avro’).

Returns:

  • (Hash)


56
57
58
# File 'lib/rflow/configuration.rb', line 56

def available_data_types
  @available_data_types ||= Hash.new {|hash, key| hash[key] = {}}
end

.establish_config_database_connection(database_path) ⇒ void

This method returns an undefined value.

Connect to the configuration SQLite database, but use ConfigurationItem to protect the connection information from other ActiveRecord apps (i.e. Rails).



119
120
121
122
123
# File 'lib/rflow/configuration.rb', line 119

def establish_config_database_connection(database_path)
  RFlow.logger.debug "Establishing connection to config database (#{Dir.getwd}) '#{database_path}'"
  ActiveRecord::Base.logger = RFlow.logger
  ConfigurationItem.establish_connection(:adapter => 'sqlite3', :database => database_path)
end

.initialize_database(database_path, config_file_path = nil) ⇒ void

This method returns an undefined value.

Connect to the configuration database, migrate it to the latest version, and process a config file if provided.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/rflow/configuration.rb', line 146

def initialize_database(database_path, config_file_path = nil)
  RFlow.logger.debug "Initializing config database (#{Dir.getwd}) '#{database_path}'"

  # TODO should not need this line
  ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => database_path)

  establish_config_database_connection database_path
  migrate_database

  working_dir = Dir.getwd
  Dir.chdir File.dirname(database_path)

  if config_file_path
    process_config_file File.expand_path(config_file_path)
  end

  RFlow.logger.debug 'Defaulting non-existing config values'
  merge_defaults!

  Dir.chdir working_dir

  self.new(database_path)
end

.merge_defaults!void

This method returns an undefined value.

Make sure that the configuration has all the necessary values set.



172
173
174
175
176
177
178
179
180
# File 'lib/rflow/configuration.rb', line 172

def merge_defaults!
  Setting::DEFAULTS.each do |name, default_value_or_proc|
    value = default_value_or_proc.is_a?(Proc) ? default_value_or_proc.call() : default_value_or_proc
    setting = Setting.find_or_create_by(:name => name, :value => value)
    unless setting.valid?
      raise RuntimeError, setting.errors.map {|_, msg| msg }.join(', ')
    end
  end
end

.migrate_databasevoid

This method returns an undefined value.

Using default ActiveRecord migrations, attempt to migrate the database to the latest version.



128
129
130
131
132
133
# File 'lib/rflow/configuration.rb', line 128

def migrate_database
  RFlow.logger.debug 'Applying default migrations to config database'
  migrations_path = File.join(File.dirname(__FILE__), 'configuration', 'migrations')
  ActiveRecord::Migration.verbose = false
  ActiveRecord::Migrator.migrate migrations_path
end

.process_config_file(path) ⇒ void

This method returns an undefined value.

Load the config file, which should load/process/store all the elements. Only run this after the database has been setup



138
139
140
141
# File 'lib/rflow/configuration.rb', line 138

def process_config_file(path)
  RFlow.logger.info "Processing config file (#{Dir.getwd}) '#{path}'"
  load path
end

Instance Method Details

#[](name) ⇒ Object

Retrieve a setting value by name from the SQLite database.

Returns:

  • (Object)


228
# File 'lib/rflow/configuration.rb', line 228

def [](name); Setting.find_by_name(name).value rescue nil; end

#available_componentsHash

Retrieve the mapping from component name to Component.

Returns:

  • (Hash)


256
# File 'lib/rflow/configuration.rb', line 256

def available_components; Configuration.available_components; end

#component(uuid) ⇒ Shard

Retrieve a single Component by UUID from the SQLite database.

Returns:



252
# File 'lib/rflow/configuration.rb', line 252

def component(uuid); Component.find_by_uuid uuid; end

#componentsArray<Component>

Retrieve all the Components from the SQLite database.

Returns:



248
# File 'lib/rflow/configuration.rb', line 248

def components; Component.all; end

#connectionsArray<Connection>

Retrieve all the Connections from the SQLite database.

Returns:



240
# File 'lib/rflow/configuration.rb', line 240

def connections; Connection.all; end

#settingsArray<Setting>

Retrieve all the Settings from the SQLite database.

Returns:



232
# File 'lib/rflow/configuration.rb', line 232

def settings; Setting.all; end

#shard(uuid) ⇒ Shard

Retrieve a single Shard by UUID from the SQLite database.

Returns:



244
# File 'lib/rflow/configuration.rb', line 244

def shard(uuid); Shard.find_by_uuid uuid; end

#shardsArray<Shard>

Retrieve all the Shards from the SQLite database.

Returns:



236
# File 'lib/rflow/configuration.rb', line 236

def shards; Shard.all; end

#to_sString

Output the RFlow configuration to a pretty-printed String.

Returns:

  • (String)


202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/rflow/configuration.rb', line 202

def to_s
  string = "Configuration:\n"

  settings.each do |setting|
    string << "Setting: '#{setting.name}' = '#{setting.value}'\n"
  end

  shards.each do |shard|
    string << "Shard #{shard.name} (#{shard.uuid}), type #{shard.class.name}, count #{shard.count}\n"
    shard.components.each do |component|
      string << "  Component '#{component.name}' as #{component.specification} (#{component.uuid})\n"
      component.output_ports.each do |output_port|
        output_port.output_connections.each do |output_connection|
          input_port = output_connection.input_port
          string << "    OutputPort '#{output_port.name}' key '#{output_connection.output_port_key}' (#{output_port.uuid}) =>\n"
          string << "      Connection '#{output_connection.name}' as #{output_connection.type} (#{output_connection.uuid}) =>\n"
          string << "      InputPort '#{input_port.name}' key '#{output_connection.input_port_key}' (#{input_port.uuid}) Component '#{input_port.component.name}' (#{input_port.component.uuid})\n"
        end
      end
    end
  end
  string
end