Class: Cuboid::RPC::Server::Agent
- Includes:
- UI::Output, Utilities
- Defined in:
- lib/cuboid/rpc/server/agent.rb
Overview
Dispatches RPC Instances on demand and allows for extensive process monitoring.
The process goes something like this:
-
A client issues a #spawn call.
-
The Agent spawns and returns Instance info to the client (url, auth token, etc.).
-
The client connects to the Instance using that info.
Once the client finishes using the RPC Instance it must shut it down otherwise the system will be eaten away by zombie processes.
Defined Under Namespace
Constant Summary collapse
- SERVICE_NAMESPACE =
Service
- PREFERENCE_STRATEGIES =
Cuboid::OptionGroups::Agent::STRATEGIES
Instance Method Summary collapse
-
#alive? ⇒ TrueClass
True.
-
#finished_instances ⇒ Array<Hash>
Returns info for all finished (dead) instances.
-
#initialize(options = Options.instance) ⇒ Agent
constructor
A new instance of Agent.
-
#instance(pid) ⇒ Hash
Returns proc info for a given pid.
-
#instances ⇒ Array<Hash>
Returns info for all instances.
-
#log ⇒ String
Contents of the log file.
- #pid ⇒ Object
-
#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?
Depending on strategy and availability:.
-
#running_instances ⇒ Array<Hash>
Returns info for all running (alive) instances.
- #services ⇒ Object
-
#spawn(options = {}, &block) ⇒ Hash?
Spawns an Instance.
-
#statistics ⇒ Hash
Returns server stats regarding the instances and pool.
-
#utilization ⇒ Float
Workload score for this Agent, calculated using System#utilization.
Methods included from UI::Output
#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?
Methods included from UI::OutputInterface
Methods included from UI::OutputInterface::Personalization
Methods included from UI::OutputInterface::Controls
#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on
Methods included from UI::OutputInterface::ErrorLogging
#error_logfile, #has_error_log?, initialize, #set_error_logfile
Methods included from UI::OutputInterface::Implemented
#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception
Methods included from UI::OutputInterface::Abstract
#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Constructor Details
#initialize(options = Options.instance) ⇒ Agent
Returns a new instance of Agent.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/cuboid/rpc/server/agent.rb', line 36 def initialize( = Options.instance ) @options = @options.snapshot.path ||= @options.paths.snapshots @server = Base.new( @options.rpc. ) @server.logger.level = @options.datastore.log_level if @options.datastore.log_level @server.add_async_check do |method| # methods that expect a block are async method.parameters.flatten.include? :block end Options.agent.url = @url = @server.url prep_logging print_status 'Starting the RPC Server...' @server.add_handler( 'agent', self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @instances = [] Cuboid::Application.application.agent_services.each do |name, service| @server.add_handler( name.to_s, service.new( @options, self ) ) end @node = Node.new( @options, @server, @logfile ) @server.add_handler( 'node', @node ) run end |
Instance Method Details
#alive? ⇒ TrueClass
Returns true.
78 79 80 |
# File 'lib/cuboid/rpc/server/agent.rb', line 78 def alive? @server.alive? end |
#finished_instances ⇒ Array<Hash>
Returns info for all finished (dead) instances.
248 249 250 |
# File 'lib/cuboid/rpc/server/agent.rb', line 248 def finished_instances instances.reject { |i| i['alive'] } end |
#instance(pid) ⇒ Hash
Returns proc info for a given pid
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/cuboid/rpc/server/agent.rb', line 213 def instance( pid ) @instances.each do |i| next if i['pid'] != pid i = i.dup now = Time.now i['now'] = now.to_s i['age'] = now - Time.parse( i['birthdate'] ) i['alive'] = Cuboid::Processes::Manager.alive?( pid ) return i end nil end |
#instances ⇒ Array<Hash>
Returns info for all instances.
232 233 234 |
# File 'lib/cuboid/rpc/server/agent.rb', line 232 def instances @instances.map { |i| instance( i['pid'] ) }.compact end |
#log ⇒ String
Returns Contents of the log file.
278 279 280 |
# File 'lib/cuboid/rpc/server/agent.rb', line 278 def log IO.read prep_logging end |
#pid ⇒ Object
283 284 285 |
# File 'lib/cuboid/rpc/server/agent.rb', line 283 def pid Process.pid end |
#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?
Returns Depending on strategy and availability:
-
URL of the preferred Agent. If not a grid member it will return
this Agent's URL.
-
‘nil` if all nodes are at max utilization or on error.
-
‘ArgumentError` – On invalid `strategy`.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/cuboid/rpc/server/agent.rb', line 94 def preferred( strategy = Cuboid::Options.agent.strategy, &block ) strategy = strategy.to_sym if !PREFERENCE_STRATEGIES.include? strategy block.call :error_unknown_strategy raise ArgumentError, "Unknown strategy: #{strategy}" end if strategy == :direct || !@node.grid_member? block.call( self.utilization == 1 ? nil : @url ) return end pick_utilization = proc do |url, utilization| (utilization == 1 || utilization.rpc_exception?) ? nil : [url, utilization] end adjust_score_by_strategy = proc do |score| case strategy when :horizontal score when :vertical -score end end each = proc do |peer, iter| connect_to_peer( peer ).utilization do |utilization| iter.return pick_utilization.call( peer, utilization ) end end after = proc do |nodes| nodes << pick_utilization.call( @url, self.utilization ) nodes.compact! # All nodes are at max utilization, pass. if nodes.empty? block.call next end block.call nodes.sort_by { |_, score| adjust_score_by_strategy.call score }[0][0] end Raktr.global.create_iterator( @node.peers ).map( each, after ) end |
#running_instances ⇒ Array<Hash>
Returns info for all running (alive) instances.
240 241 242 |
# File 'lib/cuboid/rpc/server/agent.rb', line 240 def running_instances instances.select { |i| i['alive'] } end |
#services ⇒ Object
72 73 74 |
# File 'lib/cuboid/rpc/server/agent.rb', line 72 def services Cuboid::Application.application.agent_services.keys end |
#spawn(options = {}, &block) ⇒ Hash?
Spawns an Instance.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/cuboid/rpc/server/agent.rb', line 157 def spawn( = {}, &block ) if @spawning block.call nil return end = .my_symbolize_keys strategy = .delete(:strategy) owner = [:owner] helpers = [:helpers] || {} if strategy != 'direct' && @node.grid_member? preferred *[strategy].compact do |url| if !url block.call next end if url == :error_unknown_strategy block.call :error_unknown_strategy next end connect_to_peer( url ).spawn( .merge( helpers: helpers.merge( via: @url ), strategy: :direct ), &block ) end return end if System.max_utilization? block.call return end @spawning = true spawn_instance do |info| info['owner'] = owner info['helpers'] = helpers @instances << info block.call info @spawning = false end end |
#statistics ⇒ Hash
Returns server stats regarding the instances and pool.
265 266 267 268 269 270 271 272 273 274 |
# File 'lib/cuboid/rpc/server/agent.rb', line 265 def statistics { 'utilization' => utilization, 'running_instances' => running_instances, 'finished_instances' => finished_instances, 'consumed_pids' => @instances.map { |i| i['pid'] }.compact, 'snapshots' => Dir.glob( "#{@options.snapshot.path}*.#{Snapshot::EXTENSION}" ), 'node' => @node.info } end |
#utilization ⇒ Float
Returns Workload score for this Agent, calculated using System#utilization.
-
‘0.0` => No utilization.
-
‘1.0` => Max utilization.
Lower is better.
259 260 261 |
# File 'lib/cuboid/rpc/server/agent.rb', line 259 def utilization System.utilization end |