Class: Cuboid::Processes::Agents
- Includes:
- Utilities, Singleton
- Defined in:
- lib/cuboid/processes/agents.rb
Overview
Helper for managing RPC::Server::Agent processes.
Instance Attribute Summary collapse
-
#list ⇒ Array<String>
readonly
URLs of all running Agents.
Class Method Summary collapse
Instance Method Summary collapse
-
#connect(url, options = nil) ⇒ RPC::Client::Agent
Connects to a Agent by URL.
- #each(&block) ⇒ Object
- #grid_spawn(options = {}) ⇒ Object
-
#initialize ⇒ Agents
constructor
A new instance of Agents.
- #kill(url) ⇒ Object
-
#killall ⇒ Object
Kills all #list.
-
#spawn(options = {}) ⇒ RPC::Client::Agent
Spawns a RPC::Server::Agent process.
Methods included from Utilities
#available_port, available_port_mutex, #bytes_to_kilobytes, #bytes_to_megabytes, #caller_name, #caller_path, #exception_jail, #generate_token, #hms_to_seconds, #port_available?, #rand_port, #random_seed, #regexp_array_match, #remove_constants, #seconds_to_hms
Constructor Details
#initialize ⇒ Agents
16 17 18 19 |
# File 'lib/cuboid/processes/agents.rb', line 16 def initialize @list = [] @agent_connections = {} end |
Instance Attribute Details
#list ⇒ Array<String> (readonly)
14 15 16 |
# File 'lib/cuboid/processes/agents.rb', line 14 def list @list end |
Class Method Details
.method_missing(sym, *args, &block) ⇒ Object
139 140 141 142 143 144 145 |
# File 'lib/cuboid/processes/agents.rb', line 139 def self.method_missing( sym, *args, &block ) if instance.respond_to?( sym ) instance.send( sym, *args, &block ) else super( sym, *args, &block ) end end |
.respond_to?(m) ⇒ Boolean
147 148 149 |
# File 'lib/cuboid/processes/agents.rb', line 147 def self.respond_to?( m ) super( m ) || instance.respond_to?( m ) end |
Instance Method Details
#connect(url, options = nil) ⇒ RPC::Client::Agent
Connects to a Agent by URL.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/cuboid/processes/agents.rb', line 27 def connect( url, = nil ) Raktr.global.run_in_thread if !Raktr.global.running? fresh = false if fresh = .delete( :fresh ) end if fresh @agent_connections[url] = RPC::Client::Agent.new( url, ) else @agent_connections[url] ||= RPC::Client::Agent.new( url, ) end end |
#each(&block) ⇒ Object
43 44 45 46 47 |
# File 'lib/cuboid/processes/agents.rb', line 43 def each( &block ) @list.each do |url| block.call connect( url ) end end |
#grid_spawn(options = {}) ⇒ Object
111 112 113 114 |
# File 'lib/cuboid/processes/agents.rb', line 111 def grid_spawn( = {} ) d = spawn( ) spawn( .merge peer: d.url ) end |
#kill(url) ⇒ Object
Note:
Will also kill all Instances started by the Agent.
119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/cuboid/processes/agents.rb', line 119 def kill( url ) agent = connect( url ) Manager.kill_many agent.statistics['consumed_pids'] Manager.kill agent.pid rescue => e #ap e #ap e.backtrace nil ensure @list.delete( url ) @agent_connections.delete( url ) end |
#killall ⇒ Object
Kills all #list.
133 134 135 136 137 |
# File 'lib/cuboid/processes/agents.rb', line 133 def killall @list.dup.each do |url| kill url end end |
#spawn(options = {}) ⇒ RPC::Client::Agent
Spawns a RPC::Server::Agent process.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/cuboid/processes/agents.rb', line 56 def spawn( = {} ) = .dup fork = .delete(:fork) [:ssl] ||= { server: {}, client: {} } = { agent: { name: [:name], peer: [:peer], strategy: [:strategy], }, rpc: { server_port: [:port] || Utilities.available_port, server_address: [:address] || '127.0.0.1', server_external_address: [:external_address], ssl_ca: [:ssl][:ca], server_ssl_private_key: [:ssl][:server][:private_key], server_ssl_certificate: [:ssl][:server][:certificate], client_ssl_private_key: [:ssl][:client][:private_key], client_ssl_certificate: [:ssl][:client][:certificate], }, paths: { application: [:application] || Options.paths.application } } if [:rpc][:server_external_address].nil? [:rpc].delete :server_external_address end if [:agent][:peer].nil? [:agent].delete :peer end pid = Manager.spawn( :agent, options: , fork: fork ) url = "#{options[:rpc][:server_address]}:#{options[:rpc][:server_port]}" while sleep( 0.1 ) begin connect( url, connection_pool_size: 1, max_retries: 1 ).alive? break rescue => e # ap e end end @list << url connect( url, fresh: true ).tap { |c| c.pid = pid } end |