Class: Cuboid::RPC::Server::Agent

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/cuboid/rpc/server/agent.rb

Overview

Dispatches RPC Instances on demand and allows for extensive process monitoring.

The process goes something like this:

  • A client issues a #spawn call.

  • The Agent spawns and returns Instance info to the client (url, auth token, etc.).

  • The client connects to the Instance using that info.

Once the client finishes using the RPC Instance it must shut it down otherwise the system will be eaten away by zombie processes.

Defined Under Namespace

Classes: Node, Service

Constant Summary collapse

SERVICE_NAMESPACE =
Service
PREFERENCE_STRATEGIES =
Cuboid::OptionGroups::Agent::STRATEGIES

Instance Method Summary collapse

Methods included from UI::Output

#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?

Methods included from UI::OutputInterface

initialize

Methods included from UI::OutputInterface::Personalization

#included

Methods included from UI::OutputInterface::Controls

#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on

Methods included from UI::OutputInterface::ErrorLogging

#error_logfile, #has_error_log?, initialize, #set_error_logfile

Methods included from UI::OutputInterface::Implemented

#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception

Methods included from UI::OutputInterface::Abstract

#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Constructor Details

#initialize(options = Options.instance) ⇒ Agent

Returns a new instance of Agent.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/cuboid/rpc/server/agent.rb', line 36

def initialize( options = Options.instance )
    @options = options

    @options.snapshot.path ||= @options.paths.snapshots

    @server = Base.new( @options.rpc.to_server_options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    @server.add_async_check do |method|
        # methods that expect a block are async
        method.parameters.flatten.include? :block
    end

    Options.agent.url = @url = @server.url

    prep_logging

    print_status 'Starting the RPC Server...'

    @server.add_handler( 'agent', self )

    # trap interrupts and exit cleanly when required
    trap_interrupts { shutdown }

    @instances = []

    Cuboid::Application.application.agent_services.each do |name, service|
        @server.add_handler( name.to_s, service.new( @options, self ) )
    end

    @node = Node.new( @options, @server, @logfile )
    @server.add_handler( 'node', @node )

    run
end

Instance Method Details

#alive?TrueClass

Returns true.

Returns:

  • (TrueClass)

    true



78
79
80
# File 'lib/cuboid/rpc/server/agent.rb', line 78

def alive?
    @server.alive?
end

#finished_instancesArray<Hash>

Returns info for all finished (dead) instances.

Returns:

  • (Array<Hash>)

    Returns info for all finished (dead) instances.

See Also:



248
249
250
# File 'lib/cuboid/rpc/server/agent.rb', line 248

def finished_instances
    instances.reject { |i| i['alive'] }
end

#instance(pid) ⇒ Hash

Returns proc info for a given pid

Parameters:

  • pid (Fixnum)

Returns:



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/cuboid/rpc/server/agent.rb', line 213

def instance( pid )
    @instances.each do |i|
        next if i['pid'] != pid
        i = i.dup

        now = Time.now

        i['now']   = now.to_s
        i['age']   = now - Time.parse( i['birthdate'] )
        i['alive'] = Cuboid::Processes::Manager.alive?( pid )

        return i
    end

    nil
end

#instancesArray<Hash>

Returns info for all instances.

Returns:

  • (Array<Hash>)

    Returns info for all instances.



232
233
234
# File 'lib/cuboid/rpc/server/agent.rb', line 232

def instances
    @instances.map { |i| instance( i['pid'] ) }.compact
end

#logString

Returns Contents of the log file.

Returns:

  • (String)

    Contents of the log file



278
279
280
# File 'lib/cuboid/rpc/server/agent.rb', line 278

def log
    IO.read prep_logging
end

#pidObject



283
284
285
# File 'lib/cuboid/rpc/server/agent.rb', line 283

def pid
    Process.pid
end

#preferred(strategy = Cuboid::Options.agent.strategy, &block) ⇒ String?

Returns Depending on strategy and availability:

  • URL of the preferred Agent. If not a grid member it will return

    this Agent's URL.
    
  • ‘nil` if all nodes are at max utilization or on error.

  • ‘ArgumentError` – On invalid `strategy`.

Parameters:

  • strategy (Symbol) (defaults to: Cuboid::Options.agent.strategy)

    ‘:horizontal` – Pick the Agent with the least amount of workload. `:vertical` – Pick the Agent with the most amount of workload. `:direct` – Bypass the grid and get an Instance directly from this agent.

Returns:

  • (String, nil)

    Depending on strategy and availability:

    • URL of the preferred Agent. If not a grid member it will return

      this Agent's URL.
      
    • ‘nil` if all nodes are at max utilization or on error.

    • ‘ArgumentError` – On invalid `strategy`.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/cuboid/rpc/server/agent.rb', line 94

def preferred( strategy = Cuboid::Options.agent.strategy, &block )
    strategy = strategy.to_sym
    if !PREFERENCE_STRATEGIES.include? strategy
        block.call :error_unknown_strategy
        raise ArgumentError, "Unknown strategy: #{strategy}"
    end

    if strategy == :direct || !@node.grid_member?
        block.call( self.utilization == 1 ? nil : @url )
        return
    end

    pick_utilization = proc do |url, utilization|
        (utilization == 1 || utilization.rpc_exception?) ?
            nil : [url, utilization]
    end

    adjust_score_by_strategy = proc do |score|
        case strategy
            when :horizontal
                score

            when :vertical
                -score
        end
    end

    each = proc do |peer, iter|
        connect_to_peer( peer ).utilization do |utilization|
            iter.return pick_utilization.call( peer, utilization )
        end
    end

    after = proc do |nodes|
        nodes << pick_utilization.call( @url, self.utilization )
        nodes.compact!

        # All nodes are at max utilization, pass.
        if nodes.empty?
            block.call
            next
        end

        block.call nodes.sort_by { |_, score| adjust_score_by_strategy.call score }[0][0]
    end

    Raktr.global.create_iterator( @node.peers ).map( each, after )
end

#running_instancesArray<Hash>

Returns info for all running (alive) instances.

Returns:

  • (Array<Hash>)

    Returns info for all running (alive) instances.

See Also:



240
241
242
# File 'lib/cuboid/rpc/server/agent.rb', line 240

def running_instances
    instances.select { |i| i['alive'] }
end

#servicesObject



72
73
74
# File 'lib/cuboid/rpc/server/agent.rb', line 72

def services
    Cuboid::Application.application.agent_services.keys
end

#spawn(options = {}, &block) ⇒ Hash?

Spawns an Instance.

Parameters:

  • options (String) (defaults to: {})
  • [String] (Hash)

    a customizable set of options

  • [Hash] (Hash)

    a customizable set of options

Returns:

  • (Hash, nil)

    Depending on availability:

    • ‘Hash`: Connection and proc info.

    • ‘nil`: Max utilization or currently spawning, wait and retry.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/cuboid/rpc/server/agent.rb', line 157

def spawn( options = {}, &block )
    if @spawning
        block.call nil
        return
    end

    options      = options.my_symbolize_keys
    strategy     = options.delete(:strategy)
    owner        = options[:owner]
    helpers      = options[:helpers] || {}

    if strategy != 'direct' && @node.grid_member?
        preferred *[strategy].compact do |url|
            if !url
                block.call
                next
            end

            if url == :error_unknown_strategy
                block.call :error_unknown_strategy
                next
            end

            connect_to_peer( url ).spawn( options.merge(
                  helpers:  helpers.merge( via: @url ),
                  strategy: :direct
                ),
                &block
            )
        end
        return
    end

    if System.max_utilization?
        block.call
        return
    end

    @spawning = true
    spawn_instance do |info|
        info['owner']   = owner
        info['helpers'] = helpers

        @instances << info

        block.call info

        @spawning = false
    end
end

#statisticsHash

Returns server stats regarding the instances and pool.

Returns:

  • (Hash)

    Returns server stats regarding the instances and pool.



265
266
267
268
269
270
271
272
273
274
# File 'lib/cuboid/rpc/server/agent.rb', line 265

def statistics
    {
        'utilization'         => utilization,
        'running_instances'   => running_instances,
        'finished_instances'  => finished_instances,
        'consumed_pids'       => @instances.map { |i| i['pid'] }.compact,
        'snapshots'           => Dir.glob( "#{@options.snapshot.path}*.#{Snapshot::EXTENSION}" ),
        'node'                => @node.info
    }
end

#utilizationFloat

Returns Workload score for this Agent, calculated using System#utilization.

  • ‘0.0` => No utilization.

  • ‘1.0` => Max utilization.

Lower is better.

Returns:

  • (Float)

    Workload score for this Agent, calculated using System#utilization.

    • ‘0.0` => No utilization.

    • ‘1.0` => Max utilization.

    Lower is better.



259
260
261
# File 'lib/cuboid/rpc/server/agent.rb', line 259

def utilization
    System.utilization
end