Class: Cuboid::RPC::Server::Scheduler
- Includes:
- UI::Output, Utilities
- Defined in:
- lib/cuboid/rpc/server/scheduler.rb
Overview
RPC scheduler service which:
-
Maintains a priority queue of Instance jobs.
-
Runs them once a slot is available – determined by system utilization.
-
Monitors #running Instances, retrieves and stores their reports and shuts down their Instance to free its slot.
-
Makes available information on #completed and #failed Instances.
In addition to the purely queue functionality, it also allows for running Instances to be:
-
Detached from the queue monitor and transfer the management responsibility to the client.
-
Attached to the queue monitor and transfer the management responsibility to the queue.
If a Agent has been provided, instances will be provided by it. If no Agent has been given, instances will be spawned on the Scheduler machine.
Constant Summary collapse
- TICK_CONSUME =
0.1
Instance Method Summary collapse
- #alive? ⇒ TrueClass
- #any? ⇒ Bool
-
#attach(url, token, &block) ⇒ String, ...
Attaches a running Instance to the queue monitor.
-
#clear ⇒ Object
Empties the queue.
-
#completed ⇒ Hash
Completed Instances and their report location.
-
#detach(id, &block) ⇒ Hash?
-
RPC connection information for the Instance.
-
- #empty? ⇒ Bool
- #errors(starting_line = 0) ⇒ Array<String>
-
#failed ⇒ Hash
Failed Instances and the associated error.
-
#get(id) ⇒ Hash?
-
Instance options and priority.
-
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#list ⇒ Hash<Integer,Array>
Queued Instances grouped and sorted by priority.
-
#push(options, queue_options = {}) ⇒ String
Instance ID used to reference the Instance from then on.
- #remove(id) ⇒ Object
-
#running ⇒ Hash
RPC connection information on running Instances.
-
#shutdown ⇒ Object
Shuts down the service.
- #size ⇒ Integer
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Methods included from UI::Output
#error_buffer, initialize, #log_error, #output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose, #reroute_to_file, #reroute_to_file?
Methods included from UI::OutputInterface
Methods included from UI::OutputInterface::Personalization
Methods included from UI::OutputInterface::Controls
#debug?, #debug_level, #debug_level_1?, #debug_level_2?, #debug_level_3?, #debug_level_4?, #debug_off, #debug_on, initialize, #verbose?, #verbose_off, #verbose_on
Methods included from UI::OutputInterface::ErrorLogging
#error_logfile, #has_error_log?, initialize, #set_error_logfile
Methods included from UI::OutputInterface::Implemented
#print_debug_backtrace, #print_debug_exception, #print_debug_level_1, #print_debug_level_2, #print_debug_level_3, #print_debug_level_4, #print_error_backtrace, #print_exception
Methods included from UI::OutputInterface::Abstract
#output_provider_file, #print_bad, #print_debug, #print_error, #print_info, #print_line, #print_ok, #print_status, #print_verbose
Constructor Details
#initialize ⇒ Scheduler
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 48 def initialize @options = Options.instance @options.snapshot.path ||= @options.paths.snapshots @options.report.path ||= @options.paths.reports @server = Base.new( @options.rpc. ) @server.logger.level = @options.datastore.log_level if @options.datastore.log_level Options.scheduler.url = @url = @server.url prep_logging @queue = {} @id_to_priority = {} @by_priority = {} @running = {} @completed = {} @failed = {} set_handlers( @server ) trap_interrupts { Thread.new { shutdown } } monitor_instances consume_queue run end |
Instance Method Details
#alive? ⇒ TrueClass
264 265 266 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 264 def alive? @server.alive? end |
#any? ⇒ Bool
84 85 86 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 84 def any? !empty? end |
#attach(url, token, &block) ⇒ String, ...
Attaches a running Instance to the queue monitor.
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 205 def attach( url, token, &block ) client = connect_to_instance( url, token ) client.alive? do |bool| if bool.rpc_exception? block.call next end client.scheduler_url do |scheduler_url| if scheduler_url block.call false next end client..set( scheduler: { url: @options.scheduler.url } ) do @running[token] = client block.call token end end end end |
#clear ⇒ Object
Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
Empties the queue.
231 232 233 234 235 236 237 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 231 def clear @queue.clear @by_priority.clear @id_to_priority.clear nil end |
#completed ⇒ Hash
110 111 112 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 110 def completed @completed end |
#detach(id, &block) ⇒ Hash?
185 186 187 188 189 190 191 192 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 185 def detach( id, &block ) client = @running.delete( id ) return block.call if !client client..set( scheduler: { url: nil } ) do block.call( url: client.url, token: client.token, pid: client.pid ) end end |
#empty? ⇒ Bool
79 80 81 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 79 def empty? self.size == 0 end |
#errors(starting_line = 0) ⇒ Array<String>
251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 251 def errors( starting_line = 0 ) return [] if self.error_buffer.empty? error_strings = self.error_buffer if starting_line != 0 error_strings = error_strings[starting_line..-1] end error_strings end |
#failed ⇒ Hash
116 117 118 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 116 def failed @failed end |
#get(id) ⇒ Hash?
Only returns info for queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
Returns * Instance options and priority.
-
‘nil` if a Instance with the given ID could not be found.
129 130 131 132 133 134 135 136 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 129 def get( id ) return if !@queue.include? id { options: @queue[id], priority: @id_to_priority[id] } end |
#list ⇒ Hash<Integer,Array>
95 96 97 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 95 def list @by_priority end |
#push(options, queue_options = {}) ⇒ String
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 144 def push( , = {} ) priority = .delete('priority') || 0 if !Cuboid::Application.application.( ) fail ArgumentError, 'Invalid options!' end id = Utilities.generate_token @queue[id] = @id_to_priority[id] = priority (@by_priority[priority] ||= []) << id @by_priority = Hash[@by_priority.sort_by { |k, _| -k }] id end |
#remove(id) ⇒ Object
Only affects queued Instances, once a Instance has passed through the #running stage it’s no longer part of the queue.
167 168 169 170 171 172 173 174 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 167 def remove( id ) return false if !@queue.include? id @queue.delete( id ) @by_priority[@id_to_priority.delete( id )].delete( id ) true end |
#running ⇒ Hash
101 102 103 104 105 106 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 101 def running @running.inject( {} ) do |h, (id, client)| h.merge! id => { url: client.url, token: client.token, pid: client.pid } h end end |
#shutdown ⇒ Object
Shuts down the service.
240 241 242 243 244 245 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 240 def shutdown print_status 'Shutting down...' reactor.delay 2 do reactor.stop end end |
#size ⇒ Integer
89 90 91 |
# File 'lib/cuboid/rpc/server/scheduler.rb', line 89 def size @queue.size end |