Class: Arachni::RPC::Server::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities, Sys
Defined in:
lib/arachni/rpc/server/dispatcher.rb

Overview

Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.

The process goes something like this:

  • on init the Dispatcher populates an Instance pool

  • a client issues a ‘dispatch’ call

  • the Dispatcher pops an Instance from the pool

    • asynchronously replenishes the pool

    • gives the Instance credentials to the client (url, auth token, etc.)

  • the client connects to the Instance using these credentials

Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.

Author:

Defined Under Namespace

Classes: Handler, Node

Constant Summary collapse

HANDLER_LIB =
Options.dir['rpcd_handlers']
HANDLER_NAMESPACE =
Handler

Instance Method Summary collapse

Methods included from UI::Output

#debug?, #debug_off, #debug_on, #disable_only_positives, #flush_buffer, #mute, #muted?, old_reset_output_options, #only_positives, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_pp, #print_error, #print_error_backtrace, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, reset_output_options, #set_buffer_cap, #uncap_buffer, #unmute, #verbose, #verbose?

Methods included from Utilities

#cookie_encode, #cookies_from_document, #cookies_from_file, #cookies_from_response, #exception_jail, #exclude_path?, #extract_domain, #form_decode, #form_encode, #form_parse_request_body, #forms_from_document, #forms_from_response, #get_path, #hash_keys_to_str, #html_decode, #html_encode, #include_path?, #links_from_document, #links_from_response, #normalize_url, #page_from_response, #page_from_url, #parse_query, #parse_set_cookie, #parse_url_vars, #path_in_domain?, #path_too_deep?, #remove_constants, #seed, #skip_path?, #to_absolute, #uri_decode, #uri_encode, #uri_parse, #uri_parser, #url_sanitize

Constructor Details

#initialize(opts) ⇒ Dispatcher

Returns a new instance of Dispatcher.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/arachni/rpc/server/dispatcher.rb', line 59

def initialize( opts )
    banner

    @opts = opts

    @opts.rpc_port    ||= 7331
    @opts.rpc_address ||= 'localhost'
    @opts.pool_size   ||= 5

    if @opts.help
        print_help
        exit 0
    end

    @server = Base.new( @opts )
    @server.logger.level = @opts.datastore[:log_level] if @opts.datastore[:log_level]

    @server.add_async_check do |method|
        # methods that expect a block are async
        method.parameters.flatten.include? :block
    end

    # let the instances in the pool know who to ask for routing instructions
    # when we're in grid mode.
    @opts.datastore[:dispatcher_url] = "#{@opts.rpc_address}:#{@opts.rpc_port.to_s}"

    prep_logging

    print_status 'Initing RPC Server...'

    @server.add_handler( 'dispatcher', self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @jobs = []
    @consumed_pids = []
    @pool = ::EM::Queue.new

    if @opts.pool_size > 0
        print_status 'Warming up the pool...'
        @opts.pool_size.times{ add_instance_to_pool }
    end

    print_status 'Initialization complete.'

    @node = Node.new( @opts, @logfile )
    @server.add_handler( 'node', @node )

    _handlers.each do |name, handler|
        @server.add_handler( name, handler.new( @opts, self ) )
    end

    run
end

Instance Method Details

#alive?TrueClass

Returns true.

Returns:

  • (TrueClass)

    true



120
121
122
# File 'lib/arachni/rpc/server/dispatcher.rb', line 120

def alive?
    @server.alive?
end

#dispatch(owner = 'unknown', helpers = {}, &block) ⇒ Hash

Dispatches an RPC server instance from the pool

Parameters:

  • owner (String) (defaults to: 'unknown')

    an owner assign to the dispatched RPC server

  • helpers (Hash) (defaults to: {})

    hash of helper data to be added to the job

Returns:

  • (Hash)

    includes port number, owner, clock info and proc info



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/arachni/rpc/server/dispatcher.rb', line 132

def dispatch( owner = 'unknown', helpers = {}, &block )
    if @opts.pool_size <= 0
        block.call false
        return
    end

    # just to make sure...
    owner = owner.to_s
    ::EM.next_tick { add_instance_to_pool }
    @pool.pop do |cjob|
        cjob['owner']     = owner
        cjob['starttime'] = Time.now
        cjob['helpers']   = helpers

        print_status "Instance dispatched -- PID: #{cjob['pid']} - " +
            "Port: #{cjob['port']} - Owner: #{cjob['owner']}"

        @jobs << cjob

        block.call cjob
    end
end

#handlersObject



115
116
117
# File 'lib/arachni/rpc/server/dispatcher.rb', line 115

def handlers
    _handlers.keys
end

#job(pid) ⇒ Hash

Returns proc info for a given pid

Parameters:

  • pid (Fixnum)

Returns:

  • (Hash)


162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/arachni/rpc/server/dispatcher.rb', line 162

def job( pid )
    @jobs.each do |j|
        next if j['pid'] != pid
        cjob = j.dup

        cjob['currtime'] = Time.now
        cjob['age']      = cjob['currtime'] - cjob['birthdate']
        cjob['runtime']  = cjob['currtime'] - cjob['starttime']
        cjob['proc']     = proc( cjob['pid'] )

        return cjob
    end
end

#jobsArray<Hash>

Returns proc info for all jobs

Returns:



181
182
183
# File 'lib/arachni/rpc/server/dispatcher.rb', line 181

def jobs
    @jobs.map { |cjob| job( cjob['pid'] ) }.compact
end

#logString

Returns contents of the log file.

Returns:

  • (String)

    contents of the log file



213
214
215
# File 'lib/arachni/rpc/server/dispatcher.rb', line 213

def log
    IO.read prep_logging
end

#proc_infoHash

Returns the server’s proc info.

Returns:

  • (Hash)

    the server’s proc info



218
219
220
# File 'lib/arachni/rpc/server/dispatcher.rb', line 218

def proc_info
    proc( Process.pid ).merge( 'node' => @node.info )
end

#statsHash

Returns server stats regarding the jobs and pool

Returns:

  • (Hash)


190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/arachni/rpc/server/dispatcher.rb', line 190

def stats
    cjobs    = jobs( )
    running  = cjobs.reject { |job| job['proc'].empty? }
    finished = cjobs - running

    stats_h = {
        'running_jobs'   => running,
        'finished_jobs'  => finished,
        'init_pool_size' => @opts.pool_size,
        'curr_pool_size' => @pool.size,
        'consumed_pids'  => @consumed_pids
    }

    stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours )

    stats_h['node']['score']  = (rs_score = resource_consumption_score) > 0 ? rs_score : 1
    stats_h['node']['score'] *= stats_h['node']['weight'] if stats_h['node']['weight']
    stats_h['node']['score'] = Float( stats_h['node']['score'] )

    stats_h
end