Class: Arachni::RPC::Server::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/arachni/rpc/server/dispatcher.rb

Overview

Dispatches RPC Instances on demand providing a centralized environment for multiple clients and allows for extensive process monitoring.

The process goes something like this:

  • On initialization the Dispatcher populates the Instance pool.

  • A client issues a #dispatch call.

  • The Dispatcher pops an Instance from the pool

    • Asynchronously replenishes the pool

    • Gives the Instance credentials to the client (url, auth token, etc.)

  • The client connects to the Instance using these credentials.

Once the client finishes using the RPC Instance he must shut it down otherwise the system will be eaten away by zombie RPC Instance processes.

Author:

Defined Under Namespace

Classes: Node, Service

Constant Summary collapse

SERVICE_NAMESPACE =
Service

Instance Method Summary collapse

Methods included from UI::Output

#debug?, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, #disable_only_positives, #included, #mute, #muted?, #only_positives, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error, #print_error_backtrace, #print_exception, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, reset_output_options, #unmute, #verbose?, #verbose_on

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #cookie_decode, #cookie_encode, #cookies_from_file, #cookies_from_parser, #cookies_from_response, #exception_jail, #exclude_path?, #follow_protocol?, #form_decode, #form_encode, #forms_from_parser, #forms_from_response, #full_and_absolute_url?, #generate_token, #get_path, #hms_to_seconds, #html_decode, #html_encode, #include_path?, #links_from_parser, #links_from_response, #normalize_url, #page_from_response, #page_from_url, #parse_set_cookie, #path_in_domain?, #path_too_deep?, #port_available?, #rand_port, #random_seed, #redundant_path?, #regexp_array_match, #remove_constants, #request_parse_body, #seconds_to_hms, #skip_page?, #skip_path?, #skip_resource?, #skip_response?, #to_absolute, #uri_decode, #uri_encode, #uri_parse, #uri_parse_query, #uri_parser, #uri_rewrite

Constructor Details

#initialize(options = Options.instance) ⇒ Dispatcher



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/arachni/rpc/server/dispatcher.rb', line 46

def initialize( options = Options.instance )
    @options = options

    @options.dispatcher.external_address ||= @options.rpc.server_address
    @options.snapshot.save_path          ||= @options.paths.snapshots

    @server = Base.new( @options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    @server.add_async_check do |method|
        # methods that expect a block are async
        method.parameters.flatten.include? :block
    end

    @url = "#{@options.dispatcher.external_address}:#{@options.rpc.server_port}"

    # let the instances in the pool know who to ask for routing instructions
    # when we're in grid mode.
    @options.datastore.dispatcher_url = @url

    prep_logging

    print_status 'Starting the RPC Server...'

    @server.add_handler( 'dispatcher', self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @jobs          = []
    @consumed_pids = []
    @pool          = Reactor.global.create_queue

    print_status "Populating the pool with #{@options.dispatcher.pool_size}  Instances."
    if @options.dispatcher.pool_size > 0
        @options.dispatcher.pool_size.times { add_instance_to_pool( false ) }
    end

    print_status 'Waiting for Instances to come on-line.'

    # Check up on the pool and start the server once it has been filled.
    Reactor.global.at_interval( 0.1 ) do |task|
        print_debug "Instances: #{@pool.size}/#{@options.dispatcher.pool_size}"
        next if @options.dispatcher.pool_size != @pool.size
        task.done

        print_status 'Instances are on-line.'

        _services.each do |name, service|
            @server.add_handler( name, service.new( @options, self ) )
        end

        @node = Node.new( @options, @logfile )
        @server.add_handler( 'node', @node )

        run
    end
end

Instance Method Details

#alive?TrueClass



111
112
113
# File 'lib/arachni/rpc/server/dispatcher.rb', line 111

def alive?
    @server.alive?
end

#dispatch(owner = 'unknown', helpers = {}, load_balance = true, &block) ⇒ Hash, ...

Dispatches an Instance from the pool.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/arachni/rpc/server/dispatcher.rb', line 155

def dispatch( owner = 'unknown', helpers = {}, load_balance = true, &block )
    if load_balance && @node.grid_member?
        preferred do |url|
            connect_to_peer( url ).dispatch( owner, helpers, false, &block )
        end
        return
    end

    if @options.dispatcher.pool_size <= 0
        block.call nil
        return
    end

    if @pool.empty?
        block.call false
    else
        @pool.pop do |cjob|
            cjob['owner']     = owner.to_s
            cjob['starttime'] = Time.now.to_s
            cjob['helpers']   = helpers

            print_status "Instance dispatched -- PID: #{cjob['pid']} - " +
                "Port: #{cjob['port']} - Owner: #{cjob['owner']}"

            @jobs << cjob
            block.call cjob
        end
    end

    Reactor.global.schedule { add_instance_to_pool }
end

#finished_jobsArray<Hash>

Returns info for all finished jobs.

See Also:



226
227
228
# File 'lib/arachni/rpc/server/dispatcher.rb', line 226

def finished_jobs
    jobs.reject { |job| job['alive'] }
end

#job(pid) ⇒ Hash

Returns proc info for a given pid



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/arachni/rpc/server/dispatcher.rb', line 192

def job( pid )
    @jobs.each do |j|
        next if j['pid'] != pid
        cjob = j.dup

        currtime = Time.now

        cjob['currtime'] = currtime.to_s
        cjob['age']      = currtime - Time.parse( cjob['birthdate'] )
        cjob['runtime']  = currtime - Time.parse( cjob['starttime'] )
        cjob['alive']    = Arachni::Processes::Manager.alive?( pid )

        return cjob
    end
end

#jobsArray<Hash>



210
211
212
# File 'lib/arachni/rpc/server/dispatcher.rb', line 210

def jobs
    @jobs.map { |cjob| job( cjob['pid'] ) }.compact
end

#logString



261
262
263
# File 'lib/arachni/rpc/server/dispatcher.rb', line 261

def log
    IO.read prep_logging
end

#pidObject



266
267
268
# File 'lib/arachni/rpc/server/dispatcher.rb', line 266

def pid
    Process.pid
end

#preferred(&block) ⇒ String



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/arachni/rpc/server/dispatcher.rb', line 118

def preferred( &block )
    if !@node.grid_member?
        block.call @url
        return
    end

    each = proc do |neighbour, iter|
        connect_to_peer( neighbour ).workload_score do |score|
            iter.return (!score || score.rpc_exception?) ? nil : [neighbour, score]
        end
    end

    after = proc do |nodes|
        nodes.compact!
        nodes << [@url, workload_score]
        block.call nodes.sort_by { |_, score| score }[0][0]
    end

    Reactor.global.create_iterator( @node.neighbours ).map( each, after )
end

#running_jobsArray<Hash>

Returns info for all running jobs.

See Also:



218
219
220
# File 'lib/arachni/rpc/server/dispatcher.rb', line 218

def running_jobs
    jobs.select { |job| job['alive'] }
end

#servicesObject



105
106
107
# File 'lib/arachni/rpc/server/dispatcher.rb', line 105

def services
    _services.keys
end

#statisticsHash



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/arachni/rpc/server/dispatcher.rb', line 243

def statistics
    stats_h = {
        'running_jobs'   => running_jobs,
        'finished_jobs'  => finished_jobs,
        'init_pool_size' => @options.dispatcher.pool_size,
        'curr_pool_size' => @pool.size,
        'consumed_pids'  => @consumed_pids,
        'snapshots'      => Dir.glob( "#{@options.snapshot.save_path}*.afs" )
    }

    stats_h.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours )
    stats_h['node']['score']  = workload_score

    stats_h
end

#workload_scoreFloat



235
236
237
238
239
# File 'lib/arachni/rpc/server/dispatcher.rb', line 235

def workload_score
    score = (running_jobs.size + 1).to_f
    score *= @node.info['weight'].to_f if @node.info['weight']
    score
end