Class: Arachni::RPC::Server::Dispatcher
- Includes:
- UI::Output, Utilities, Sys
- Defined in:
- lib/arachni/rpc/server/dispatcher.rb
Overview
Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.
The process goes something like this:
-
on init the Dispatcher populates an Instance pool
-
a client issues a ‘dispatch’ call
-
the Dispatcher pops an Instance from the pool
-
asynchronously replenishes the pool
-
gives the Instance credentials to the client (url, auth token, etc.)
-
-
the client connects to the Instance using these credentials
Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.
Defined Under Namespace
Constant Summary collapse
Instance Method Summary collapse
-
#alive? ⇒ TrueClass
True.
-
#dispatch(owner = 'unknown', helpers = {}, &block) ⇒ Hash
Dispatches an RPC server instance from the pool.
- #handlers ⇒ Object
-
#initialize(opts) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#job(pid) ⇒ Hash
Returns proc info for a given pid.
-
#jobs ⇒ Array<Hash>
Returns proc info for all jobs.
-
#log ⇒ String
Contents of the log file.
-
#proc_info ⇒ Hash
The server’s proc info.
-
#stats ⇒ Hash
Returns server stats regarding the jobs and pool.
Methods included from UI::Output
#debug?, #debug_off, #debug_on, #disable_only_positives, #flush_buffer, #mute, #muted?, old_reset_output_options, #only_positives, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_pp, #print_error, #print_error_backtrace, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, reset_output_options, #set_buffer_cap, #uncap_buffer, #unmute, #verbose, #verbose?
Methods included from Utilities
#cookie_encode, #cookies_from_document, #cookies_from_file, #cookies_from_response, #exception_jail, #exclude_path?, #extract_domain, #form_decode, #form_encode, #form_parse_request_body, #forms_from_document, #forms_from_response, #get_path, #hash_keys_to_str, #html_decode, #html_encode, #include_path?, #links_from_document, #links_from_response, #normalize_url, #page_from_response, #page_from_url, #parse_query, #parse_set_cookie, #parse_url_vars, #path_in_domain?, #path_too_deep?, #remove_constants, #seed, #skip_path?, #to_absolute, #uri_decode, #uri_encode, #uri_parse, #uri_parser, #url_sanitize
Constructor Details
#initialize(opts) ⇒ Dispatcher
Returns a new instance of Dispatcher.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 59 def initialize( opts ) @opts = opts @opts.rpc_port ||= 7331 @opts.rpc_address ||= 'localhost' @opts.pool_size ||= 5 if @opts.help print_help exit 0 end @server = Base.new( @opts ) @server.logger.level = @opts.datastore[:log_level] if @opts.datastore[:log_level] @server.add_async_check do |method| # methods that expect a block are async method.parameters.flatten.include? :block end # let the instances in the pool know who to ask for routing instructions # when we're in grid mode. @opts.datastore[:dispatcher_url] = "#{@opts.rpc_address}:#{@opts.rpc_port.to_s}" prep_logging print_status 'Initing RPC Server...' @server.add_handler( 'dispatcher', self ) # trap interrupts and exit cleanly when required trap_interrupts { shutdown } @jobs = [] @consumed_pids = [] @pool = ::EM::Queue.new if @opts.pool_size > 0 print_status 'Warming up the pool...' @opts.pool_size.times{ add_instance_to_pool } end print_status 'Initialization complete.' @node = Node.new( @opts, @logfile ) @server.add_handler( 'node', @node ) _handlers.each do |name, handler| @server.add_handler( name, handler.new( @opts, self ) ) end run end |
Instance Method Details
#alive? ⇒ TrueClass
Returns true.
120 121 122 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 120 def alive? @server.alive? end |
#dispatch(owner = 'unknown', helpers = {}, &block) ⇒ Hash
Dispatches an RPC server instance from the pool
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 132 def dispatch( owner = 'unknown', helpers = {}, &block ) if @opts.pool_size <= 0 block.call false return end # just to make sure... owner = owner.to_s ::EM.next_tick { add_instance_to_pool } @pool.pop do |cjob| cjob['owner'] = owner cjob['starttime'] = Time.now cjob['helpers'] = helpers print_status "Instance dispatched -- PID: #{cjob['pid']} - " + "Port: #{cjob['port']} - Owner: #{cjob['owner']}" @jobs << cjob block.call cjob end end |
#handlers ⇒ Object
115 116 117 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 115 def handlers _handlers.keys end |
#job(pid) ⇒ Hash
Returns proc info for a given pid
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 162 def job( pid ) @jobs.each do |j| next if j['pid'] != pid cjob = j.dup cjob['currtime'] = Time.now cjob['age'] = cjob['currtime'] - cjob['birthdate'] cjob['runtime'] = cjob['currtime'] - cjob['starttime'] cjob['proc'] = proc( cjob['pid'] ) return cjob end end |
#jobs ⇒ Array<Hash>
Returns proc info for all jobs
181 182 183 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 181 def jobs @jobs.map { |cjob| job( cjob['pid'] ) }.compact end |
#log ⇒ String
Returns contents of the log file.
213 214 215 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 213 def log IO.read prep_logging end |
#proc_info ⇒ Hash
Returns the server’s proc info.
218 219 220 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 218 def proc_info proc( Process.pid ).merge( 'node' => @node.info ) end |
#stats ⇒ Hash
Returns server stats regarding the jobs and pool
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/arachni/rpc/server/dispatcher.rb', line 190 def stats cjobs = jobs( ) running = cjobs.reject { |job| job['proc'].empty? } finished = cjobs - running stats_h = { 'running_jobs' => running, 'finished_jobs' => finished, 'init_pool_size' => @opts.pool_size, 'curr_pool_size' => @pool.size, 'consumed_pids' => @consumed_pids } stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours ) stats_h['node']['score'] = (rs_score = resource_consumption_score) > 0 ? rs_score : 1 stats_h['node']['score'] *= stats_h['node']['weight'] if stats_h['node']['weight'] stats_h['node']['score'] = Float( stats_h['node']['score'] ) stats_h end |