Class: Arachni::RPC::Server::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
Module::Utilities, UI::Output, Sys
Defined in:
lib/arachni/rpc/server/dispatcher.rb,
lib/arachni/rpc/server/node.rb

Overview

Dispatcher class

Dispatches RPC servers on demand providing a centralized environment for multiple RPC clients and allows for extensive process monitoring.

The process goes something like this:

* a client issues a 'dispatch' call
* the dispatcher starts a new RPC server on a random port
* the dispatcher returns the port of the RPC server to the client
* the client connects to the RPC server listening on that port and does his business

Once the client finishes using the RPC server it must shut it down.<br/> If it doesn’t the system will be eaten away by idle instances of RPC servers.

@author: Tasos “Zapotek” Laskos

<[email protected]>
<[email protected]>

@version: 0.2

Defined Under Namespace

Classes: Node

Instance Method Summary collapse

Methods included from UI::Output

#buffer, #debug!, #debug?, #flush_buffer, #mute!, #muted?, #only_positives!, #only_positives?, #print_bad, #print_debug, #print_debug_backtrace, #print_debug_pp, #print_error, #print_error_backtrace, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?, #uncap_buffer!, #unmute!, #verbose!, #verbose?

Methods included from Module::Utilities

#exception_jail, #get_path, #hash_keys_to_str, #normalize_url, #read_file, #seed, #uri_decode, #uri_encode, #uri_parse, #uri_parser, #url_sanitize

Constructor Details

#initialize(opts) ⇒ Dispatcher

Returns a new instance of Dispatcher.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/arachni/rpc/server/dispatcher.rb', line 53

def initialize( opts )

    banner

    @opts = opts

    @opts.rpc_port     ||= 7331
    @opts.rpc_address  ||= 'localhost'
    @opts.pool_size    ||= 5

    if @opts.help
        print_help
        exit 0
    end

    @server = Base.new( @opts )

    @server.add_async_check {
        |method|
        # methods that expect a block are async
        method.parameters.flatten.include?( :block )
    }

    # let the instances in the pool know who to ask for routing instructions
    # when we're in grid mode.
    @opts.datastore[:dispatcher_url] = "#{@opts.rpc_address}:#{@opts.rpc_port.to_s}"

    prep_logging

    print_status( 'Initing RPC Server...' )

    @server.add_handler( "dispatcher", self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @jobs = []
    @pool = Queue.new
    @replenisher = Queue.new

    print_status( 'Warming up the pool...' )
    @opts.pool_size.times{ add_instance_to_pool }

    # this thread will wait in the background and replenish the pool
    Thread.new {
        loop {
            add_instance_to_pool
            @replenisher.pop
        }
    }

    @node = nil

    print_status( 'Done.' )

    print_status( 'Initialization complete.' )

    run
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/arachni/rpc/server/dispatcher.rb', line 113

def alive?
    @server.alive?
end

#dispatch(owner = 'unknown', helpers = {}) ⇒ Hash

Dispatches an RPC server instance from the pool

Parameters:

  • owner (String) (defaults to: 'unknown')

    an owner assign to the dispatched RPC server

  • helpers (Hash) (defaults to: {})

    hash of helper data to be added to the job

Returns:

  • (Hash)

    includes port number, owner, clock info and proc info



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/arachni/rpc/server/dispatcher.rb', line 125

def dispatch( owner = 'unknown', helpers = {} )

    # just to make sure...
    owner = owner.to_s
    cjob  = @pool.shift
    cjob['owner']     = owner
    cjob['starttime'] = Time.now
    cjob['helpers']   = helpers

    print_status( "Instance dispatched -- PID: #{cjob['pid']} - " +
        "Port: #{cjob['port']} - Owner: #{cjob['owner']}" )

    @replenisher << true

    @jobs << cjob

    return cjob
end

#job(pid) ⇒ Hash

Returns proc info for a given pid

Parameters:

  • pid (Fixnum)

Returns:

  • (Hash)


151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/arachni/rpc/server/dispatcher.rb', line 151

def job( pid )
    @jobs.each {
        |i|
        cjob = i.dup
        if cjob['pid'] == pid
            cjob['currtime'] = Time.now
            cjob['age'] = cjob['currtime'] - cjob['birthdate']
            cjob['runtime']  = cjob['currtime'] - cjob['starttime']
            cjob['proc'] =  proc( cjob['pid'] )

            return cjob
        end
    }
end

#jobsArray<Hash>

Returns proc info for all jobs

Returns:



171
172
173
174
175
176
177
178
179
# File 'lib/arachni/rpc/server/dispatcher.rb', line 171

def jobs
    jobs = []
    @jobs.each {
        |cjob|
        proc_info = job( cjob['pid'] )
        jobs << proc_info if proc_info
    }
    return jobs
end

#logObject



205
206
207
# File 'lib/arachni/rpc/server/dispatcher.rb', line 205

def log
    IO.read( prep_logging )
end

#proc_infoObject



209
210
211
212
213
214
215
216
217
# File 'lib/arachni/rpc/server/dispatcher.rb', line 209

def proc_info
    p = proc( Process.pid )

    if @node
        p.merge!( 'node' => @node.info )
    end

    return p
end

#statsHash

Returns server stats regarding the jobs and pool

Returns:

  • (Hash)


186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/arachni/rpc/server/dispatcher.rb', line 186

def stats
    cjobs    = jobs( )
    running  = cjobs.reject{ |job| job['proc'].empty? }
    finished = cjobs - running

    stats = {
        'running_jobs'    => running,
        'finished_jobs'   => finished,
        'init_pool_size'  => @opts.pool_size,
        'curr_pool_size'  => @pool.size
    }

    if @node
        stats.merge!( 'node' => @node.info, 'neighbours' => @node.neighbours )
    end

    return stats
end