Class: Orchestrator::Control
- Inherits:
-
Object
- Object
- Orchestrator::Control
- Includes:
- Singleton
- Defined in:
- lib/orchestrator/control.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
-
#ready ⇒ Object
readonly
Returns the value of attribute ready.
-
#zones ⇒ Object
readonly
Returns the value of attribute zones.
Instance Method Summary collapse
-
#boot(*args) ⇒ Object
Boot the control system, running all defined modules.
-
#initialize ⇒ Control
constructor
1.
-
#load(mod_settings) ⇒ Object
Load the modules on the loop references in round robin This method is thread safe.
-
#loaded?(mod_id) ⇒ Boolean
Checks if a module with the ID specified is loaded.
- #log_unhandled_exception(*args) ⇒ Object
-
#mount ⇒ Object
Start the control reactor.
- #notify_ready ⇒ Object
- #reload(dep_id) ⇒ Object
-
#start(mod_id) ⇒ Object
Starts a module running.
-
#stop(mod_id) ⇒ Object
Stops a module running.
-
#unload(mod_id) ⇒ Object
Stop the module gracefully Then remove it from @loaded.
-
#update(mod_id) ⇒ Object
Unload then Get a fresh version of the settings from the database load the module.
Constructor Details
#initialize ⇒ Control
-
Load the modules allocated to this node
-
Allocate modules to CPUs
-
Modules load dependencies as required
-
Logics are streamed in after devices and services
-
Logic modules will fetch their system when they interact with other modules.
Devices and services do not have a system associated with them
This makes systems very loosely coupled to the modules
which should make distributing the system slightly simpler
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/orchestrator/control.rb', line 23 def initialize # critical sections @critical = ::Mutex.new @loaded = ::ThreadSafe::Cache.new @zones = ::ThreadSafe::Cache.new @loader = DependencyManager.instance @loop = ::Libuv::Loop.default @exceptions = method(:log_unhandled_exception) @ready = false if Rails.env.production? logger = ::Logger.new(::Rails.root.join('log/control.log').to_s, 10, 4194304) else logger = ::Logger.new(STDOUT) end logger.formatter = proc { |severity, datetime, progname, msg| "#{datetime.strftime("%d/%m/%Y @ %I:%M%p")} #{severity}: #{progname} - #{msg}\n" } @logger = ::ActiveSupport::TaggedLogging.new(logger) end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
46 47 48 |
# File 'lib/orchestrator/control.rb', line 46 def logger @logger end |
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
46 47 48 |
# File 'lib/orchestrator/control.rb', line 46 def loop @loop end |
#ready ⇒ Object (readonly)
Returns the value of attribute ready.
46 47 48 |
# File 'lib/orchestrator/control.rb', line 46 def ready @ready end |
#zones ⇒ Object (readonly)
Returns the value of attribute zones.
46 47 48 |
# File 'lib/orchestrator/control.rb', line 46 def zones @zones end |
Instance Method Details
#boot(*args) ⇒ Object
Boot the control system, running all defined modules
83 84 85 86 |
# File 'lib/orchestrator/control.rb', line 83 def boot(*args) # Only boot if running as a server Thread.new &method(:load_all) end |
#load(mod_settings) ⇒ Object
Load the modules on the loop references in round robin This method is thread safe.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/orchestrator/control.rb', line 90 def load(mod_settings) mod_id = mod_settings.id.to_sym defer = @loop.defer mod = @loaded[mod_id] if mod defer.resolve(mod) else defer.resolve( @loader.load(mod_settings.dependency).then(proc { |klass| # We will always be on the default loop here thread = @selector.next # We'll resolve the promise if the module loads on the deferred thread defer = @loop.defer thread.schedule do defer.resolve(start_module(thread, klass, mod_settings)) end # update the module cache defer.promise.then do |mod_manager| @loaded[mod_id] = mod_manager end defer.promise }, @exceptions) ) end defer.promise end |
#loaded?(mod_id) ⇒ Boolean
Checks if a module with the ID specified is loaded
121 122 123 |
# File 'lib/orchestrator/control.rb', line 121 def loaded?(mod_id) @loaded[mod_id.to_sym] end |
#log_unhandled_exception(*args) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/orchestrator/control.rb', line 201 def log_unhandled_exception(*args) msg = '' err = args[-1] if err && err.respond_to?(:backtrace) msg << "exception: #{err.message} (#{args[0..-2]})" msg << "\n#{err.backtrace.join("\n")}" if err.respond_to?(:backtrace) && err.backtrace else msg << "unhandled exception: #{args}" end @logger.error msg ::Libuv::Q.reject(@loop, msg) end |
#mount ⇒ Object
Start the control reactor
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/orchestrator/control.rb', line 50 def mount return @server.loaded if @server @critical.synchronize { return if @server # Protect against multiple mounts # Cache all the zones in the system ::Orchestrator::Zone.all.each do |zone| @zones[zone.id] = zone end @server = ::SpiderGazelle::Spider.instance @server.loaded.then do # Share threads with SpiderGazelle (one per core) if @server.mode == :thread @threads = @server.threads else # We are either running no_ipc or process (unsupported for control) @threads = Set.new cpus = ::Libuv.cpu_count || 1 cpus.times &method(:start_thread) @loop.signal :INT, method(:kill_workers) end @selector = @threads.cycle end } return @server.loaded end |
#notify_ready ⇒ Object
195 196 197 198 199 |
# File 'lib/orchestrator/control.rb', line 195 def notify_ready # Clear the system cache (in case it has been populated at all) System.clear_cache @ready = true end |
#reload(dep_id) ⇒ Object
189 190 191 192 193 |
# File 'lib/orchestrator/control.rb', line 189 def reload(dep_id) @loop.work do reload_dep(dep_id) end end |
#start(mod_id) ⇒ Object
Starts a module running
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/orchestrator/control.rb', line 126 def start(mod_id) defer = @loop.defer mod = loaded? mod_id if mod mod.thread.schedule do mod.start defer.resolve(true) end else err = Error::ModuleNotFound.new "unable to start module '#{mod_id}', not found" defer.reject(err) @logger.warn err. end defer.promise end |
#stop(mod_id) ⇒ Object
Stops a module running
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/orchestrator/control.rb', line 145 def stop(mod_id) defer = @loop.defer mod = loaded? mod_id if mod mod.thread.schedule do mod.stop defer.resolve(true) end else err = Error::ModuleNotFound.new "unable to stop module '#{mod_id}', not found" defer.reject(err) @logger.warn err. end defer.promise end |
#unload(mod_id) ⇒ Object
Stop the module gracefully Then remove it from @loaded
165 166 167 168 169 170 |
# File 'lib/orchestrator/control.rb', line 165 def unload(mod_id) stop(mod_id).then(proc { @loaded.delete(mod_id.to_sym) true # promise response }) end |
#update(mod_id) ⇒ Object
Unload then Get a fresh version of the settings from the database load the module
175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/orchestrator/control.rb', line 175 def update(mod_id) unload(mod_id).then(proc { # Grab database model in the thread pool res = @loop.work do ::Orchestrator::Module.find(mod_id) end # Load the module if model found res.then(proc { |config| load(config) # Promise chaining to here }) }) end |