Class: RFlow::Configuration

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/configuration.rb,
lib/rflow/configuration/port.rb,
lib/rflow/configuration/shard.rb,
lib/rflow/configuration/setting.rb,
lib/rflow/configuration/ruby_dsl.rb,
lib/rflow/configuration/component.rb,
lib/rflow/configuration/connection.rb,
lib/rflow/configuration/uuid_keyed.rb

Overview

Contains all the configuration data and methods for RFlow. Interacts directly with underlying sqlite database, and keeps a registry of available data types, extensions, and components. Also includes an external DSL, RubyDSL, that can be used in crafting config-like files that load the database.

Configuration provides a MVC-like framework for config files, where the models are the Setting, Component, Port, and Connection subclasses, the controllers are things like RubyDSL, and the views are defined relative to the controllers

Defined Under Namespace

Modules: UUIDKeyed Classes: BrokeredZMQConnection, Component, ConfigurationItem, Connection, DataExtensionCollection, InputPort, NullConnectionConfiguration, OutputPort, Port, ProcessShard, RubyDSL, Setting, Shard, ThreadShard, ZMQConnection, ZMQStreamer

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_path = nil) ⇒ Configuration

Returns a new instance of Configuration.



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/rflow/configuration.rb', line 170

def initialize(database_path = nil)
  # If there is not a config DB path, assume that an AR
  # connection has already been established
  if database_path
    @database_path = database_path
    Configuration.establish_config_database_connection(database_path)
  end

  # Validate the connected database.
  # TODO: make this more complete, i.e. validate the various columns
  begin
    [Setting, Shard, Component, Port, Connection].each(&:first)
  rescue ActiveRecord::StatementInvalid => e
    raise ArgumentError, "Invalid schema in configuration database: #{e.message}"
  end
end

Class Method Details

.add_available_component(component) ⇒ Object

Used when RFlow::Component is subclassed to add another available component to the list.



100
101
102
103
104
105
# File 'lib/rflow/configuration.rb', line 100

def add_available_component(component)
  if available_components.include?(component.name)
    raise ArgumentError, "Component already '#{component.name}' already defined"
  end
  available_components[component.name] = component
end

.add_available_data_extension(data_type_name, extension) ⇒ Object

Add a data extension to the available_data_extensions class attributes. The data_extension parameter should be the name of a ruby module that will extend RFlow::Message::Data object to provide additional methods/capability. Naive, prefix-based inheritance is possible, see available_data_extensions or the DataExtensionCollection class



90
91
92
93
94
95
96
# File 'lib/rflow/configuration.rb', line 90

def add_available_data_extension(data_type_name, extension)
  unless extension.is_a? Module
    raise ArgumentError, "Invalid data extension #{extension} for #{data_type_name}.  Only Ruby Modules allowed"
  end

  available_data_extensions.add data_type_name, extension
end

.add_available_data_type(name, serialization_type, schema) ⇒ Object

Add a schema to the available_data_types class attribute. Schema is indexed by data_type_name and schema/serialization type. ‘avro’ is currently the only supported serialization_type.

Raises:

  • (ArgumentError)


74
75
76
77
78
79
80
81
82
# File 'lib/rflow/configuration.rb', line 74

def add_available_data_type(name, serialization_type, schema)
  raise ArgumentError, "Data serialization_type must be 'avro' for '#{name}'" unless serialization_type == 'avro'

  if available_data_types[name.to_s].include? serialization_type.to_s
    raise ArgumentError, "Data type '#{name}' already defined for serialization_type '#{serialization_type}'"
  end

  available_data_types[name.to_s][serialization_type.to_s] = schema
end

.available_componentsObject

A Hash of defined components, usually automatically populated when a component subclasses RFlow::Component



64
65
66
# File 'lib/rflow/configuration.rb', line 64

def available_components
  @available_components ||= {}
end

.available_data_extensionsObject

A DataExtensionCollection to hold available extensions that will be applied to the de-serialized data types



58
59
60
# File 'lib/rflow/configuration.rb', line 58

def available_data_extensions
  @available_data_extensions ||= DataExtensionCollection.new
end

.available_data_typesObject

A collection of data types (schemas) indexed by their name and their schema type (‘avro’).



52
53
54
# File 'lib/rflow/configuration.rb', line 52

def available_data_types
  @available_data_types ||= Hash.new {|hash, key| hash[key] = {}}
end

.establish_config_database_connection(database_path) ⇒ Object

Connect to the configuration sqlite database, but use the ConfigurationItem class to protect the connection information from other ActiveRecord apps (i.e. Rails)



110
111
112
113
114
# File 'lib/rflow/configuration.rb', line 110

def establish_config_database_connection(database_path)
  RFlow.logger.debug "Establishing connection to config database (#{Dir.getwd}) '#{database_path}'"
  ActiveRecord::Base.logger = RFlow.logger
  ConfigurationItem.establish_connection(:adapter => 'sqlite3', :database => database_path)
end

.initialize_database(database_path, config_file_path = nil) ⇒ Object

Connect to the configuration database, migrate it to the latest version, and process a config file if provided.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/rflow/configuration.rb', line 134

def initialize_database(database_path, config_file_path = nil)
  RFlow.logger.debug "Initializing config database (#{Dir.getwd}) '#{database_path}'"

  # TODO should not need this line
  ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => database_path)

  establish_config_database_connection database_path
  migrate_database

  working_dir = Dir.getwd
  Dir.chdir File.dirname(database_path)

  if config_file_path
    process_config_file File.expand_path(config_file_path)
  end

  RFlow.logger.debug 'Defaulting non-existing config values'
  merge_defaults!

  Dir.chdir working_dir

  self.new(database_path)
end

.merge_defaults!Object

Make sure that the configuration has all the necessary values set.



159
160
161
162
163
164
165
166
167
# File 'lib/rflow/configuration.rb', line 159

def merge_defaults!
  Setting::DEFAULTS.each do |name, default_value_or_proc|
    value = default_value_or_proc.is_a?(Proc) ? default_value_or_proc.call() : default_value_or_proc
    setting = Setting.find_or_create_by_name(:name => name, :value => value)
    unless setting.valid?
      raise RuntimeError, setting.errors.map {|_, msg| msg }.join(', ')
    end
  end
end

.migrate_databaseObject

Using default ActiveRecord migrations, attempt to migrate the database to the latest version.



118
119
120
121
122
123
# File 'lib/rflow/configuration.rb', line 118

def migrate_database
  RFlow.logger.debug 'Applying default migrations to config database'
  migrations_path = File.join(File.dirname(__FILE__), 'configuration', 'migrations')
  ActiveRecord::Migration.verbose = false
  ActiveRecord::Migrator.migrate migrations_path
end

.process_config_file(path) ⇒ Object

Load the config file, which should load/process/store all the elements. Only run this after the database has been setup



127
128
129
130
# File 'lib/rflow/configuration.rb', line 127

def process_config_file(path)
  RFlow.logger.info "Processing config file (#{Dir.getwd}) '#{path}'"
  load path
end

Instance Method Details

#[](name) ⇒ Object



211
# File 'lib/rflow/configuration.rb', line 211

def [](name); Setting.find_by_name(name).value rescue nil; end

#available_componentsObject



218
# File 'lib/rflow/configuration.rb', line 218

def available_components; Configuration.available_components; end

#component(uuid) ⇒ Object



217
# File 'lib/rflow/configuration.rb', line 217

def component(uuid); Component.find_by_uuid uuid; end

#componentsObject



216
# File 'lib/rflow/configuration.rb', line 216

def components; Component.all; end

#connectionsObject



214
# File 'lib/rflow/configuration.rb', line 214

def connections; Connection.all; end

#settingsObject



212
# File 'lib/rflow/configuration.rb', line 212

def settings; Setting.all; end

#shard(uuid) ⇒ Object



215
# File 'lib/rflow/configuration.rb', line 215

def shard(uuid); Shard.find_by_uuid uuid; end

#shardsObject



213
# File 'lib/rflow/configuration.rb', line 213

def shards; Shard.all; end

#to_sObject



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/rflow/configuration.rb', line 187

def to_s
  string = "Configuration:\n"

  settings.each do |setting|
    string << "Setting: '#{setting.name}' = '#{setting.value}'\n"
  end

  shards.each do |shard|
    string << "Shard #{shard.name} (#{shard.uuid}), type #{shard.class.name}, count #{shard.count}\n"
    shard.components.each do |component|
      string << "  Component '#{component.name}' as #{component.specification} (#{component.uuid})\n"
      component.output_ports.each do |output_port|
        output_port.output_connections.each do |output_connection|
          input_port = output_connection.input_port
          string << "    OutputPort '#{output_port.name}' key '#{output_connection.output_port_key}' (#{output_port.uuid}) =>\n"
          string << "      Connection '#{output_connection.name}' as #{output_connection.type} (#{output_connection.uuid}) =>\n"
          string << "      InputPort '#{input_port.name}' key '#{output_connection.input_port_key}' (#{input_port.uuid}) Component '#{input_port.component.name}' (#{input_port.component.uuid})\n"
        end
      end
    end
  end
  string
end