Class: Cuboid::RPC::Server::Agent
- Includes:
- UI::Output, Utilities
- Defined in:
- lib/cuboid/rpc/server/agent.rb
Overview
Dispatches RPC Instances on demand and allows for extensive process monitoring.
The process goes something like this:
-
A client issues a #spawn call.
-
The Agent spawns and returns Instance info to the client (url, auth token, etc.).
-
The client connects to the Instance using that info.
Once the client finishes using the RPC Instance it must shut it down otherwise the system will be eaten away by zombie processes.
Defined Under Namespace
Constant Summary collapse
- SERVICE_NAMESPACE =
Service
- PREFERENCE_STRATEGIES =
Cuboid::OptionGroups::Agent::STRATEGIES
Instance Method Summary collapse
-
#alive? ⇒ TrueClass
True.
-
#finished_instances ⇒ Array<Hash>
Returns info for all finished (dead) instances.
-
#initialize(options = Options.instance) ⇒ Agent
constructor
A new instance of Agent.
-
#instance(pid) ⇒ Hash
Returns proc info for a given pid.
-
#instances ⇒ Array<Hash>
Returns info for all instances.
-
#log ⇒ String
Contents of the log file.
- #pid ⇒ Object
-
#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?
Depending on strategy and availability:.
-
#running_instances ⇒ Array<Hash>
Returns info for all running (alive) instances.
- #services ⇒ Object
-
#spawn(options = {}, &block) ⇒ Hash?
Dispatches an Instance.
-
#statistics ⇒ Hash
Returns server stats regarding the instances and pool.
-
#utilization ⇒ Float
Workload score for this Agent, calculated using System#utilization.
Methods included from UI::Output
#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?
Methods included from UI::OutputInterface
Methods included from UI::OutputInterface::Personalization
Methods included from UI::OutputInterface::Controls
#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on
Methods included from UI::OutputInterface::ErrorLogging
#error_logfile, #has_error_log?, initialize, #set_error_logfile
Methods included from UI::OutputInterface::Implemented
#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception
Methods included from UI::OutputInterface::Abstract
#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Constructor Details
#initialize(options = Options.instance) ⇒ Agent
Returns a new instance of Agent.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/cuboid/rpc/server/agent.rb', line 36 def initialize( = Options.instance ) = .snapshot.path ||= .paths.snapshots @server = Base.new( .rpc. ) @server.logger.level = .datastore.log_level if .datastore.log_level @server.add_async_check do |method| # methods that expect a block are async method.parameters.flatten.include? :block end Options.agent.url = @url = @server.url prep_logging print_status 'Starting the RPC Server...' @server.add_handler( 'agent', self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @instances = [] Cuboid::Application.application.agent_services.each do |name, service| @server.add_handler( name.to_s, service.new( , self ) ) end @node = Node.new( , @server, @logfile ) @server.add_handler( 'node', @node ) run end |
Instance Method Details
#alive? ⇒ TrueClass
Returns true.
78 79 80 |
# File 'lib/cuboid/rpc/server/agent.rb', line 78 def alive? @server.alive? end |
#finished_instances ⇒ Array<Hash>
Returns info for all finished (dead) instances.
242 243 244 |
# File 'lib/cuboid/rpc/server/agent.rb', line 242 def finished_instances instances.reject { |i| i['alive'] } end |
#instance(pid) ⇒ Hash
Returns proc info for a given pid
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/cuboid/rpc/server/agent.rb', line 207 def instance( pid ) @instances.each do |i| next if i['pid'] != pid i = i.dup now = Time.now i['now'] = now.to_s i['age'] = now - Time.parse( i['birthdate'] ) i['alive'] = Cuboid::Processes::Manager.alive?( pid ) return i end nil end |
#instances ⇒ Array<Hash>
Returns info for all instances.
226 227 228 |
# File 'lib/cuboid/rpc/server/agent.rb', line 226 def instances @instances.map { |i| instance( i['pid'] ) }.compact end |
#log ⇒ String
Returns Contents of the log file.
272 273 274 |
# File 'lib/cuboid/rpc/server/agent.rb', line 272 def log IO.read prep_logging end |
#pid ⇒ Object
277 278 279 |
# File 'lib/cuboid/rpc/server/agent.rb', line 277 def pid Process.pid end |
#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?
Returns Depending on strategy and availability:
-
URL of the preferred Agent. If not a grid member it will return
this Agent's URL.
-
‘nil` if all nodes are at max utilization or on error.
-
‘ArgumentError` – On invalid `strategy`.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/cuboid/rpc/server/agent.rb', line 93 def preferred( strategy = Cuboid::Options.agent.strategy, &block ) strategy = strategy.to_sym if !PREFERENCE_STRATEGIES.include? strategy block.call :error_unknown_strategy raise ArgumentError, "Unknown strategy: #{strategy}" end if !@node.grid_member? block.call( self.utilization == 1 ? nil : @url ) return end pick_utilization = proc do |url, utilization| (utilization == 1 || utilization.rpc_exception?) ? nil : [url, utilization] end adjust_score_by_strategy = proc do |score| case strategy when :horizontal score when :vertical -score end end each = proc do |peer, iter| connect_to_peer( peer ).utilization do |utilization| iter.return pick_utilization.call( peer, utilization ) end end after = proc do |nodes| nodes << pick_utilization.call( @url, self.utilization ) nodes.compact! # All nodes are at max utilization, pass. if nodes.empty? block.call next end block.call nodes.sort_by { |_, score| adjust_score_by_strategy.call score }[0][0] end Arachni::Reactor.global.create_iterator( @node.peers ).map( each, after ) end |
#running_instances ⇒ Array<Hash>
Returns info for all running (alive) instances.
234 235 236 |
# File 'lib/cuboid/rpc/server/agent.rb', line 234 def running_instances instances.select { |i| i['alive'] } end |
#services ⇒ Object
72 73 74 |
# File 'lib/cuboid/rpc/server/agent.rb', line 72 def services Cuboid::Application.application.agent_services.keys end |
#spawn(options = {}, &block) ⇒ Hash?
Dispatches an Instance.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/cuboid/rpc/server/agent.rb', line 158 def spawn( = {}, &block ) = .my_symbolize_keys strategy = .delete(:strategy) owner = [:owner] helpers = [:helpers] || {} load_balance = [:load_balance].nil? ? true : [:load_balance] if load_balance && @node.grid_member? preferred *[strategy].compact do |url| if !url block.call next end if url == :error_unknown_strategy block.call :error_unknown_strategy next end connect_to_peer( url ).spawn( .merge( helpers: helpers.merge( via: @url ), load_balance: false ), &block ) end return end if System.max_utilization? block.call return end spawn_instance do |info| info['owner'] = owner info['helpers'] = helpers @instances << info block.call info end end |
#statistics ⇒ Hash
Returns server stats regarding the instances and pool.
259 260 261 262 263 264 265 266 267 268 |
# File 'lib/cuboid/rpc/server/agent.rb', line 259 def statistics { 'utilization' => utilization, 'running_instances' => running_instances, 'finished_instances' => finished_instances, 'consumed_pids' => @instances.map { |i| i['pid'] }.compact, 'snapshots' => Dir.glob( "#{@options.snapshot.path}*.#{Snapshot::EXTENSION}" ), 'node' => @node.info } end |
#utilization ⇒ Float
Returns Workload score for this Agent, calculated using System#utilization.
-
‘0.0` => No utilization.
-
‘1.0` => Max utilization.
Lower is better.
253 254 255 |
# File 'lib/cuboid/rpc/server/agent.rb', line 253 def utilization System.utilization end |