Class: Bullring::ServerRegistry
- Inherits:
-
Object
- Object
- Bullring::ServerRegistry
- Defined in:
- lib/bullring/util/server_registry.rb
Constant Summary collapse
- MAX_SERVERS_PER_GENERATION =
1
Instance Attribute Summary collapse
-
#tuplespace ⇒ Object
readonly
Returns the value of attribute tuplespace.
Instance Method Summary collapse
- #[](dictionary, key) ⇒ Object
- #[]=(dictionary, key, value) ⇒ Object
- #close! ⇒ Object
- #current_server_generation ⇒ Object
- #dump_tuplespace ⇒ Object
- #expire_servers ⇒ Object
-
#initialize(host, port, first_server_port, &start_server_block) ⇒ ServerRegistry
constructor
A new instance of ServerRegistry.
-
#lease_server ⇒ Object
Blocks until a server is available, then returns it.
-
#lease_server! ⇒ Object
First starts up a server if needed then blocks until it is available and returns it.
- #next_server_port ⇒ Object
- #num_current_generation_servers ⇒ Object
- #num_servers(generation = nil) ⇒ Object
- #register_server(uri) ⇒ Object
- #registry_open? ⇒ Boolean
- #registry_unavailable? ⇒ Boolean
- #release_server ⇒ Object
- #test! ⇒ Object
Constructor Details
#initialize(host, port, first_server_port, &start_server_block) ⇒ ServerRegistry
Returns a new instance of ServerRegistry.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/bullring/util/server_registry.rb', line 56 def initialize(host, port, first_server_port, &start_server_block) @registry_host = @server_host = host @registry_port = port @start_server_block = start_server_block @servers = {} registry_uri = "druby://#{host}:#{port}" if registry_unavailable? pid = Kernel.fork do @tuplespace = Rinda::TupleSpaceProxy.new(Rinda::TupleSpace.new) @tuplespace.write([:global_lock]) @tuplespace.write([:next_client_id, 0]) @tuplespace.write([:server_generation, 0]) @tuplespace.write([:next_server_port, "#{first_server_port}"]) if !first_server_port.nil? DRb.start_service registry_uri, @tuplespace Thread.new do @client_id = 'registry' @tuplespace.notify("write", [:registry_closed]).pop kill_available_servers Thread.main.exit end DRb.thread.join end Process.detach(pid) end time_sleeping = 0 while (registry_unavailable?) sleep(0.2) if (time_sleeping += 0.2) > 20 #@options[:process][:max_bringup_time] Bullring.logger.error {"#{caller_name}: Timed out waiting to bring up the registry server"} raise StandardError, "#{caller_name}: Timed out waiting to bring up the registry server", caller end end # The registry should be available here so connect to it if we don't # already serve it. @tuplespace ||= TuplespaceWrapper.new(registry_uri) # Every user (client) of server registry has its own instance of the registry, so that # instance can store its own client id. _, @client_id = @tuplespace.take([:next_client_id, nil]) @tuplespace.write([:next_client_id, @client_id + 1]) end |
Instance Attribute Details
#tuplespace ⇒ Object (readonly)
Returns the value of attribute tuplespace.
52 53 54 |
# File 'lib/bullring/util/server_registry.rb', line 52 def tuplespace @tuplespace end |
Instance Method Details
#[](dictionary, key) ⇒ Object
172 173 174 175 176 177 |
# File 'lib/bullring/util/server_registry.rb', line 172 def [](dictionary, key) with_lock do _, _, _, value = @tuplespace.read(['data', dictionary, key, nil], 0) rescue nil return value end end |
#[]=(dictionary, key, value) ⇒ Object
165 166 167 168 169 170 |
# File 'lib/bullring/util/server_registry.rb', line 165 def []=(dictionary, key, value) with_lock do @tuplespace.take(['data', dictionary, key, nil], 0) rescue nil @tuplespace.write(['data', dictionary, key, value]) end end |
#close! ⇒ Object
156 157 158 |
# File 'lib/bullring/util/server_registry.rb', line 156 def close! @tuplespace.write([:registry_closed]) end |
#current_server_generation ⇒ Object
179 180 181 |
# File 'lib/bullring/util/server_registry.rb', line 179 def current_server_generation @tuplespace.read([:server_generation, nil])[1] end |
#dump_tuplespace ⇒ Object
204 205 206 207 208 |
# File 'lib/bullring/util/server_registry.rb', line 204 def dump_tuplespace "Available: " + @tuplespace.read_all(['available', nil, nil]).inspect + \ ", Leased: " + @tuplespace.read_all(['leased', nil, nil, nil]).inspect + \ ", Data: " + @tuplespace.read_all(['data', nil, nil, nil]).inspect end |
#expire_servers ⇒ Object
148 149 150 151 152 153 154 |
# File 'lib/bullring/util/server_registry.rb', line 148 def expire_servers with_lock do _, generation = @tuplespace.take([:server_generation, nil]) @tuplespace.write([:server_generation, generation + 1]) kill_available_servers(generation) end end |
#lease_server ⇒ Object
Blocks until a server is available, then returns it
127 128 129 130 |
# File 'lib/bullring/util/server_registry.rb', line 127 def lease_server server = _lease_server(:timeout => 0) until !server.nil? server end |
#lease_server! ⇒ Object
First starts up a server if needed then blocks until it is available and returns it
113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/bullring/util/server_registry.rb', line 113 def lease_server! begin if num_current_generation_servers < MAX_SERVERS_PER_GENERATION && registry_open? start_a_server end lease_server rescue ServerOffline => e Bullring.logger.debug {"Lost connection with a server, retrying..."} retry end end |
#next_server_port ⇒ Object
105 106 107 108 109 110 |
# File 'lib/bullring/util/server_registry.rb', line 105 def next_server_port _, port = @tuplespace.take([:next_server_port, nil]) port = port.to_i + 1 while !Network::is_port_open?(@server_host, port) @tuplespace.write([:next_server_port, "#{port.to_i + 1}"]) port end |
#num_current_generation_servers ⇒ Object
188 189 190 |
# File 'lib/bullring/util/server_registry.rb', line 188 def num_current_generation_servers num_servers(current_server_generation) end |
#num_servers(generation = nil) ⇒ Object
183 184 185 186 |
# File 'lib/bullring/util/server_registry.rb', line 183 def num_servers(generation = nil) @tuplespace.read_all(['available', generation, nil]).count + \ @tuplespace.read_all(['leased', nil, generation, nil]).count end |
#register_server(uri) ⇒ Object
160 161 162 163 |
# File 'lib/bullring/util/server_registry.rb', line 160 def register_server(uri) fail_unless_registry_open! @tuplespace.write(['available', current_server_generation, uri]) end |
#registry_open? ⇒ Boolean
192 193 194 |
# File 'lib/bullring/util/server_registry.rb', line 192 def registry_open? !tuple_present?([:registry_closed]) end |
#registry_unavailable? ⇒ Boolean
200 201 202 |
# File 'lib/bullring/util/server_registry.rb', line 200 def registry_unavailable? Network::is_port_open?(@registry_host, @registry_port) end |
#release_server ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/bullring/util/server_registry.rb', line 132 def release_server # In case the lease wasn't successful, don't hang on the release begin _, _, generation, uri = @tuplespace.take(['leased', @client_id, nil, nil], 0) # Only register the server if its generation hasn't expired, otherwise # kill and forget if generation < current_server_generation || !registry_open? @servers[uri].kill rescue ServerOffline @servers[uri] = nil else register_server(uri) end rescue Rinda::RequestExpiredError => e; end end |
#test! ⇒ Object
196 197 198 |
# File 'lib/bullring/util/server_registry.rb', line 196 def test! @tuplespace.write([:temp]) end |