Class: Arachni::RPC::Server::Dispatcher
- Includes:
- Module::Utilities, UI::Output, Sys
- Defined in:
- lib/arachni/rpc/server/dispatcher.rb,
lib/arachni/rpc/server/node.rb
Overview
Dispatcher class
Dispatches RPC servers on demand providing a centralized environment for multiple RPC clients and allows for extensive process monitoring.
The process goes something like this:
* a client issues a 'dispatch' call
* the dispatcher starts a new RPC server on a random port
* the dispatcher returns the port of the RPC server to the client
* the client connects to the RPC server listening on that port and does his business
Once the client finishes using the RPC server it must shut it down.<br/> If it doesn’t the system will be eaten away by idle instances of RPC servers.
@author: Tasos “Zapotek” Laskos
<[email protected]>
<[email protected]>
@version: 0.2
Defined Under Namespace
Classes: Node
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#dispatch(owner = 'unknown', helpers = {}) ⇒ Hash
Dispatches an RPC server instance from the pool.
-
#initialize(opts) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#job(pid) ⇒ Hash
Returns proc info for a given pid.
-
#jobs ⇒ Array<Hash>
Returns proc info for all jobs.
- #log ⇒ Object
- #proc_info ⇒ Object
-
#stats ⇒ Hash
Returns server stats regarding the jobs and pool.
Methods included from UI::Output
#buffer, #debug!, #debug?, #flush_buffer, #mute!, #muted?, #only_positives!, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_pp, #print_error, #print_error_backtrace, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, #uncap_buffer!, #unmute!, #verbose!, #verbose?
Methods included from Module::Utilities
#exception_jail, #get_path, #hash_keys_to_str, #normalize_url, #read_file, #seed, #uri_decode, #uri_encode, #uri_parse, #uri_parser, #url_sanitize
Constructor Details
#initialize(opts) ⇒ Dispatcher
Returns a new instance of Dispatcher.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 53 def initialize( opts ) @opts = opts @opts.rpc_port ||= 7331 @opts.rpc_address ||= 'localhost' @opts.pool_size ||= 5 if @opts.help print_help exit 0 end @server = Base.new( @opts ) @server.add_async_check { |method| # methods that expect a block are async method.parameters.flatten.include?( :block ) } # let the instances in the pool know who to ask for routing instructions # when we're in grid mode. @opts.datastore[:dispatcher_url] = "#{@opts.rpc_address}:#{@opts.rpc_port.to_s}" prep_logging print_status( 'Initing RPC Server...' ) @server.add_handler( "dispatcher", self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @jobs = [] @pool = Queue.new @replenisher = Queue.new print_status( 'Warming up the pool...' ) @opts.pool_size.times{ add_instance_to_pool } # this thread will wait in the background and replenish the pool Thread.new { loop { add_instance_to_pool @replenisher.pop } } @node = nil print_status( 'Done.' ) print_status( 'Initialization complete.' ) run end |
Instance Method Details
#alive? ⇒ Boolean
113 114 115 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 113 def alive? @server.alive? end |
#dispatch(owner = 'unknown', helpers = {}) ⇒ Hash
Dispatches an RPC server instance from the pool
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 125 def dispatch( owner = 'unknown', helpers = {} ) # just to make sure... owner = owner.to_s cjob = @pool.shift cjob['owner'] = owner cjob['starttime'] = Time.now cjob['helpers'] = helpers print_status( "Instance dispatched -- PID: #{cjob['pid']} - " + "Port: #{cjob['port']} - Owner: #{cjob['owner']}" ) @replenisher << true @jobs << cjob return cjob end |
#job(pid) ⇒ Hash
Returns proc info for a given pid
151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 151 def job( pid ) @jobs.each { |i| cjob = i.dup if cjob['pid'] == pid cjob['currtime'] = Time.now cjob['age'] = cjob['currtime'] - cjob['birthdate'] cjob['runtime'] = cjob['currtime'] - cjob['starttime'] cjob['proc'] = proc( cjob['pid'] ) return cjob end } end |
#jobs ⇒ Array<Hash>
Returns proc info for all jobs
171 172 173 174 175 176 177 178 179 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 171 def jobs jobs = [] @jobs.each { |cjob| proc_info = job( cjob['pid'] ) jobs << proc_info if proc_info } return jobs end |
#log ⇒ Object
205 206 207 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 205 def log IO.read( prep_logging ) end |
#proc_info ⇒ Object
209 210 211 212 213 214 215 216 217 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 209 def proc_info p = proc( Process.pid ) if @node p.merge!( 'node' => @node.info ) end return p end |
#stats ⇒ Hash
Returns server stats regarding the jobs and pool
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 186 def stats cjobs = jobs( ) running = cjobs.reject{ |job| job['proc'].empty? } finished = cjobs - running stats = { 'running_jobs' => running, 'finished_jobs' => finished, 'init_pool_size' => @opts.pool_size, 'curr_pool_size' => @pool.size } if @node stats.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours ) end return stats end |