Class: Cuboid::Processes::Schedulers
- Includes:
- Utilities, Singleton
- Defined in:
- lib/cuboid/processes/schedulers.rb
Overview
Helper for managing RPC::Server::Scheduler processes.
Instance Attribute Summary collapse
-
#list ⇒ Array<String>
readonly
URLs of all running Queues.
Class Method Summary collapse
Instance Method Summary collapse
-
#connect(url, options = nil) ⇒ RPC::Client::Scheduler
Connects to a Scheduler by URL.
- #each(&block) ⇒ Object
-
#initialize ⇒ Schedulers
constructor
A new instance of Schedulers.
- #kill(url) ⇒ Object
-
#killall ⇒ Object
Kills all #list.
-
#spawn(options = {}) ⇒ RPC::Client::Queue
Spawns a RPC::Server::Scheduler process.
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Constructor Details
#initialize ⇒ Schedulers
Returns a new instance of Schedulers.
14 15 16 17 |
# File 'lib/cuboid/processes/schedulers.rb', line 14 def initialize @list = [] @clients = {} end |
Instance Attribute Details
#list ⇒ Array<String> (readonly)
Returns URLs of all running Queues.
12 13 14 |
# File 'lib/cuboid/processes/schedulers.rb', line 12 def list @list end |
Class Method Details
.method_missing(sym, *args, &block) ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/cuboid/processes/schedulers.rb', line 126 def self.method_missing( sym, *args, &block ) if instance.respond_to?( sym ) instance.send( sym, *args, &block ) else super( sym, *args, &block ) end end |
.respond_to?(m) ⇒ Boolean
134 135 136 |
# File 'lib/cuboid/processes/schedulers.rb', line 134 def self.respond_to?( m ) super( m ) || instance.respond_to?( m ) end |
Instance Method Details
#connect(url, options = nil) ⇒ RPC::Client::Scheduler
Connects to a Scheduler by URL.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/cuboid/processes/schedulers.rb', line 25 def connect( url, = nil ) Raktr.global.run_in_thread if !Raktr.global.running? fresh = false if fresh = .delete( :fresh ) end if fresh @clients[url] = RPC::Client::Scheduler.new( url, ) else @clients[url] ||= RPC::Client::Scheduler.new( url, ) end end |
#each(&block) ⇒ Object
41 42 43 44 45 |
# File 'lib/cuboid/processes/schedulers.rb', line 41 def each( &block ) @list.each do |url| block.call connect( url ) end end |
#kill(url) ⇒ Object
Note:
Will also kill all Instances started by the Scheduler.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/cuboid/processes/schedulers.rb', line 103 def kill( url ) scheduler = connect( url ) scheduler.clear scheduler.running.each do |id, instance| Manager.kill instance['pid'] end Manager.kill scheduler.pid rescue => e #ap e #ap e.backtrace nil ensure @list.delete( url ) @clients.delete( url ).close end |
#killall ⇒ Object
Kills all #list.
120 121 122 123 124 |
# File 'lib/cuboid/processes/schedulers.rb', line 120 def killall @list.dup.each do |url| kill url end end |
#spawn(options = {}) ⇒ RPC::Client::Queue
Spawns a RPC::Server::Scheduler process.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/cuboid/processes/schedulers.rb', line 54 def spawn( = {} ) = .dup fork = .delete(:fork) [:ssl] ||= { server: {}, client: {} } = { agent: { url: [:agent], strategy: [:strategy] }, rpc: { server_port: [:port] || Utilities.available_port, server_address: [:address] || '127.0.0.1', server_external_address: [:external_address], ssl_ca: [:ssl][:ca], server_ssl_private_key: [:ssl][:server][:private_key], server_ssl_certificate: [:ssl][:server][:certificate], client_ssl_private_key: [:ssl][:client][:private_key], client_ssl_certificate: [:ssl][:client][:certificate], }, paths: { application: [:application] || Options.paths.application } } pid = Manager.spawn( :scheduler, options: , fork: fork ) url = "#{[:rpc][:server_address]}:#{[:rpc][:server_port]}" while sleep( 0.1 ) begin connect( url, connection_pool_size: 1, max_retries: 1 ).alive? break rescue => e # ap e end end @list << url connect( url, fresh: true ).tap { |c| c.pid = pid } end |