Class: Cuboid::RPC::Server::Scheduler

Inherits:
Object
  • Object
show all
Includes:
UI::Output, Utilities
Defined in:
lib/cuboid/rpc/server/scheduler.rb

Overview

RPC scheduler service which:

In addition to the purely queue functionality, it also allows for running Instances to be:

  • Detached from the queue monitor and transfer the management responsibility to the client.

  • Attached to the queue monitor and transfer the management responsibility to the queue.

If a Agent has been provided, instances will be provided by it. If no Agent has been given, instances will be spawned on the Scheduler machine.

Author:

Constant Summary collapse

TICK_CONSUME =
0.1

Instance Method Summary collapse

Methods included from Utilities

#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms

Methods included from UI::Output

#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?

Methods included from UI::OutputInterface

initialize

Methods included from UI::OutputInterface::Personalization

#included

Methods included from UI::OutputInterface::Controls

#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on

Methods included from UI::OutputInterface::ErrorLogging

#error_logfile, #has_error_log?, initialize, #set_error_logfile

Methods included from UI::OutputInterface::Implemented

#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception

Methods included from UI::OutputInterface::Abstract

#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose

Constructor Details

#initializeScheduler



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/cuboid/rpc/server/scheduler.rb', line 48

def initialize
    @options = Options.instance

    @options.snapshot.path ||= @options.paths.snapshots
    @options.report.path   ||= @options.paths.reports

    @server = Base.new( @options.rpc.to_server_options )
    @server.logger.level = @options.datastore.log_level if @options.datastore.log_level

    Options.scheduler.url = @url = @server.url

    prep_logging

    @queue          = {}
    @id_to_priority = {}
    @by_priority    = {}

    @running   = {}
    @completed = {}
    @failed    = {}

    set_handlers( @server )
    trap_interrupts { Thread.new { shutdown } }

    monitor_instances
    consume_queue

    run
end

Instance Method Details

#alive?TrueClass



264
265
266
# File 'lib/cuboid/rpc/server/scheduler.rb', line 264

def alive?
    @server.alive?
end

#any?Bool



84
85
86
# File 'lib/cuboid/rpc/server/scheduler.rb', line 84

def any?
    !empty?
end

#attach(url, token, &block) ⇒ String, ...

Attaches a running Instance to the queue monitor.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/cuboid/rpc/server/scheduler.rb', line 205

def attach( url, token, &block )
    client = connect_to_instance( url, token )
    client.alive? do |bool|
        if bool.rpc_exception?
            block.call
            next
        end

        client.scheduler_url do |scheduler_url|
            if scheduler_url
                block.call false
                next
            end

            client.options.set( scheduler: { url: @options.scheduler.url } ) do
                @running[token] = client
                block.call token
            end
        end
    end
end

#clearObject

Note:

Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Empties the queue.



231
232
233
234
235
236
237
# File 'lib/cuboid/rpc/server/scheduler.rb', line 231

def clear
    @queue.clear
    @by_priority.clear
    @id_to_priority.clear

    nil
end

#completedHash



110
111
112
# File 'lib/cuboid/rpc/server/scheduler.rb', line 110

def completed
    @completed
end

#detach(id, &block) ⇒ Hash?



185
186
187
188
189
190
191
192
# File 'lib/cuboid/rpc/server/scheduler.rb', line 185

def detach( id, &block )
    client = @running.delete( id )
    return block.call if !client

    client.options.set( scheduler: { url: nil } ) do
        block.call( url: client.url, token: client.token, pid: client.pid )
    end
end

#empty?Bool



79
80
81
# File 'lib/cuboid/rpc/server/scheduler.rb', line 79

def empty?
    self.size == 0
end

#errors(starting_line = 0) ⇒ Array<String>



251
252
253
254
255
256
257
258
259
260
261
# File 'lib/cuboid/rpc/server/scheduler.rb', line 251

def errors( starting_line = 0 )
    return [] if self.error_buffer.empty?

    error_strings = self.error_buffer

    if starting_line != 0
        error_strings = error_strings[starting_line..-1]
    end

    error_strings
end

#failedHash



116
117
118
# File 'lib/cuboid/rpc/server/scheduler.rb', line 116

def failed
    @failed
end

#get(id) ⇒ Hash?

Note:

Only returns info for queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.

Returns * Instance options and priority.

  • ‘nil` if a Instance with the given ID could not be found.



129
130
131
132
133
134
135
136
# File 'lib/cuboid/rpc/server/scheduler.rb', line 129

def get( id )
    return if !@queue.include? id

    {
        options:  @queue[id],
        priority: @id_to_priority[id]
    }
end

#listHash<Integer,Array>



95
96
97
# File 'lib/cuboid/rpc/server/scheduler.rb', line 95

def list
    @by_priority
end

#push(options, queue_options = {}) ⇒ String



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/cuboid/rpc/server/scheduler.rb', line 144

def push( options, queue_options = {} )
    priority = queue_options.delete('priority') || 0

    if !Cuboid::Application.application.valid_options?( options )
        fail ArgumentError, 'Invalid options!'
    end

    id = Utilities.generate_token

    @queue[id]          = options
    @id_to_priority[id] = priority

    (@by_priority[priority] ||= []) << id
    @by_priority = Hash[@by_priority.sort_by { |k, _| -k }]

    id
end

#remove(id) ⇒ Object

Note:

Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.



167
168
169
170
171
172
173
174
# File 'lib/cuboid/rpc/server/scheduler.rb', line 167

def remove( id )
    return false if !@queue.include? id

    @queue.delete( id )
    @by_priority[@id_to_priority.delete( id )].delete( id )

    true
end

#runningHash



101
102
103
104
105
106
# File 'lib/cuboid/rpc/server/scheduler.rb', line 101

def running
    @running.inject( {} ) do |h, (id, client)|
        h.merge! id => { url: client.url, token: client.token, pid: client.pid }
        h
    end
end

#shutdownObject

Shuts down the service.



240
241
242
243
244
245
# File 'lib/cuboid/rpc/server/scheduler.rb', line 240

def shutdown
    print_status 'Shutting down...'
    reactor.delay 2 do
        reactor.stop
    end
end

#sizeInteger



89
90
91
# File 'lib/cuboid/rpc/server/scheduler.rb', line 89

def size
    @queue.size
end