Class: RubySkynet::Zookeeper::ServiceRegistry
- Inherits:
-
Object
- Object
- RubySkynet::Zookeeper::ServiceRegistry
- Includes:
- SemanticLogger::Loggable
- Defined in:
- lib/ruby_skynet/zookeeper/service_registry.rb
Defined Under Namespace
Classes: ServerInfo
Instance Method Summary collapse
-
#deregister_service(name, version, region, hostname, port) ⇒ Object
Deregister the supplied service from the Registry.
-
#initialize(params = {}) ⇒ ServiceRegistry
constructor
Create a service registry See: RubyDoozer::Registry for the parameters.
-
#on_server_removed(server, &block) ⇒ Object
Invokes registered callbacks when a specific server is shutdown or terminates Not when a server de-registers itself The callback will only be called once and will need to be re-registered after being called if future callbacks are required for that server.
-
#register_service(name, version, region, hostname, port) ⇒ Object
Register the supplied service at this Skynet Server host and Port Returns the UUID for the service that was created.
-
#server_for(name, version = '*', region = RubySkynet.region) ⇒ Object
Return a server that implements the specified service.
-
#servers_for(name, version = '*', region = RubySkynet.region) ⇒ Object
Returns [Array<String>] a list of servers implementing the requested service.
-
#to_h ⇒ Object
Returns the Service Registry as a Hash.
Constructor Details
#initialize(params = {}) ⇒ ServiceRegistry
Create a service registry See: RubyDoozer::Registry for the parameters
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ruby_skynet/zookeeper/service_registry.rb', line 19 def initialize(params = {}) # Registry has the following format # Key: [String] 'name/version/region' # Value: [Array<String>] 'host:port', 'host:port' @cache = ThreadSafe::Hash.new @notifications_cache = ThreadSafe::Hash.new # Keep a list of registered services so that they can be re-registered # if the connection is lost @services = ThreadSafe::Hash.new # Supply block to load the current keys from the Registry params[:root] = '/instances' params[:ephemeral] = true params[:on_connect] = Proc.new do |registry| # Re-Register services every time the connection to ZooKeeper is lost @services.values.each {|v| register_service(*v)} end @registry = Zookeeper::Registry.new(params) do |key, value| service_info_created(key, value) end # Register Callbacks @registry.on_create {|path, value| service_info_created(path, value) } @registry.on_update {|path, value| service_info_updated(path, value) } @registry.on_delete {|path| service_info_deleted(path) } end |
Instance Method Details
#deregister_service(name, version, region, hostname, port) ⇒ Object
Deregister the supplied service from the Registry
66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/ruby_skynet/zookeeper/service_registry.rb', line 66 def deregister_service(name, version, region, hostname, port) uuid = "#{hostname}:#{port}-#{$$}-#{name}-#{version}" @registry.delete(File.join(uuid,'addr'), false) @registry.delete(File.join(uuid,'name'), false) @registry.delete(File.join(uuid,'version'), false) @registry.delete(File.join(uuid,'region'), false) @registry.delete(File.join(uuid,'registered'), false) @registry.delete(uuid, false) # Remove from local services list @services.delete(uuid) uuid end |
#on_server_removed(server, &block) ⇒ Object
Invokes registered callbacks when a specific server is shutdown or terminates Not when a server de-registers itself The callback will only be called once and will need to be re-registered after being called if future callbacks are required for that server
112 113 114 |
# File 'lib/ruby_skynet/zookeeper/service_registry.rb', line 112 def on_server_removed(server, &block) ((@on_server_removed_callbacks ||= ThreadSafe::Hash.new)[server] ||= ThreadSafe::Array.new) << block end |
#register_service(name, version, region, hostname, port) ⇒ Object
Register the supplied service at this Skynet Server host and Port Returns the UUID for the service that was created
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/ruby_skynet/zookeeper/service_registry.rb', line 53 def register_service(name, version, region, hostname, port) uuid = "#{hostname}:#{port}-#{$$}-#{name}-#{version}" @registry[File.join(uuid,'addr')] = "#{hostname}:#{port}" @registry[File.join(uuid,'name')] = name @registry[File.join(uuid,'version')] = version @registry[File.join(uuid,'region')] = region @registry[File.join(uuid,'registered')] = true # Add to local services list @services[uuid] = [name, version, region, hostname, port] uuid end |
#server_for(name, version = '*', region = RubySkynet.region) ⇒ Object
Return a server that implements the specified service
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/ruby_skynet/zookeeper/service_registry.rb', line 80 def server_for(name, version='*', region=RubySkynet.region) if servers = servers_for(name, version, region) # Randomly select one of the servers offering the service servers[rand(servers.size)] else msg = "No servers available for service: #{name} with version: #{version} in region: #{region}" logger.warn msg raise ServiceUnavailable.new(msg) end end |
#servers_for(name, version = '*', region = RubySkynet.region) ⇒ Object
Returns [Array<String>] a list of servers implementing the requested service
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/ruby_skynet/zookeeper/service_registry.rb', line 92 def servers_for(name, version='*', region=RubySkynet.region) if version == '*' # Find the highest version for the named service in this region version = -1 @cache.keys.each do |key| if match = key.match(/#{name}\/(\d+)\/#{region}/) ver = match[1].to_i version = ver if ver > version end end end if server_infos = @cache["#{name}/#{version}/#{region}"] server_infos.first.servers end end |
#to_h ⇒ Object
Returns the Service Registry as a Hash
47 48 49 |
# File 'lib/ruby_skynet/zookeeper/service_registry.rb', line 47 def to_h @cache.dup end |