Class: VCAP::Services::AsynchronousServiceGateway
- Inherits:
-
BaseAsynchronousServiceGateway
- Object
- Sinatra::Base
- BaseAsynchronousServiceGateway
- VCAP::Services::AsynchronousServiceGateway
- Defined in:
- lib/base/asynchronous_service_gateway.rb
Constant Summary collapse
- REQ_OPTS =
%w(service token provisioner cloud_controller_uri).map { |o| o.to_sym }
Instance Attribute Summary collapse
-
#event_machine ⇒ Object
readonly
Returns the value of attribute event_machine.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#service ⇒ Object
readonly
Returns the value of attribute service.
Instance Method Summary collapse
- #check_orphan(handles, callback, errback) ⇒ Object
- #get_current_catalog ⇒ Object
-
#initialize(opts) ⇒ AsynchronousServiceGateway
constructor
A new instance of AsynchronousServiceGateway.
-
#setup(opts) ⇒ Object
setup the environment.
-
#validate_incoming_request ⇒ Object
Validate the incoming request.
Methods included from Base::Error
#failure, #internal_fail, #parse_msg, #success, #timeout_fail
Constructor Details
#initialize(opts) ⇒ AsynchronousServiceGateway
Returns a new instance of AsynchronousServiceGateway.
17 18 19 |
# File 'lib/base/asynchronous_service_gateway.rb', line 17 def initialize(opts) super(opts) end |
Instance Attribute Details
#event_machine ⇒ Object (readonly)
Returns the value of attribute event_machine.
15 16 17 |
# File 'lib/base/asynchronous_service_gateway.rb', line 15 def event_machine @event_machine end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
15 16 17 |
# File 'lib/base/asynchronous_service_gateway.rb', line 15 def logger @logger end |
#service ⇒ Object (readonly)
Returns the value of attribute service.
15 16 17 |
# File 'lib/base/asynchronous_service_gateway.rb', line 15 def service @service end |
Instance Method Details
#check_orphan(handles, callback, errback) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/base/asynchronous_service_gateway.rb', line 101 def check_orphan(handles, callback, errback) @provisioner.check_orphan(handles) do |msg| if msg['success'] callback.call event_machine.add_timer(@double_check_orphan_interval) { fetch_handles { |rs| @provisioner.double_check_orphan(rs.handles) } } else errback.call(msg['response']) end end end |
#get_current_catalog ⇒ Object
97 98 99 |
# File 'lib/base/asynchronous_service_gateway.rb', line 97 def get_current_catalog GatewayServiceCatalog.new([service]).to_hash end |
#setup(opts) ⇒ Object
setup the environment
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/base/asynchronous_service_gateway.rb', line 22 def setup(opts) missing_opts = REQ_OPTS.select { |o| !opts.has_key? o } raise ArgumentError, "Missing options: #{missing_opts.join(', ')}" unless missing_opts.empty? @service = opts[:service] @token = opts[:token] @logger = opts[:logger] || make_logger() @cld_ctrl_uri = http_uri(opts[:cloud_controller_uri]) @provisioner = opts[:provisioner] @hb_interval = opts[:heartbeat_interval] || 60 @node_timeout = opts[:node_timeout] @handle_fetch_interval = opts[:handle_fetch_interval] || 1 @check_orphan_interval = opts[:check_orphan_interval] || -1 @double_check_orphan_interval = opts[:double_check_orphan_interval] || 300 @handle_fetched = opts[:handle_fetched] || false @fetching_handles = false @version_aliases = (service[:version_aliases] ||= {}) opts[:gateway_name] ||= "Service Gateway" @cc_api_version = opts[:cc_api_version] || "v1" if @cc_api_version == "v1" require 'catalog_manager_v1' @catalog_manager = VCAP::Services::CatalogManagerV1.new(opts) elsif @cc_api_version == "v2" require 'catalog_manager_v2' @catalog_manager = VCAP::Services::CatalogManagerV2.new(opts) else raise "Unknown cc_api_version: #{@cc_api_version}" end @event_machine = opts[:event_machine] || EM # Setup heartbeats and exit handlers event_machine.add_periodic_timer(@hb_interval) { send_heartbeat } event_machine.next_tick { send_heartbeat } Kernel.at_exit do if event_machine.reactor_running? # :/ We can't stop others from killing the event-loop here. Let's hope that they play nice send_deactivation_notice(false) else event_machine.run { send_deactivation_notice } end end # Add any necessary handles we don't know about update_callback = Proc.new do |resp| @provisioner.update_handles(resp.handles) @handle_fetched = true event_machine.cancel_timer(@fetch_handle_timer) # TODO remove it when we finish the migration current_version = @version_aliases && @version_aliases[:current] if current_version @provisioner.update_version_info(current_version) else logger.info("No current version alias is supplied, skip update version in CCDB.") end end @fetch_handle_timer = event_machine.add_periodic_timer(@handle_fetch_interval) { fetch_handles(&update_callback) } event_machine.next_tick { fetch_handles(&update_callback) } if @check_orphan_interval > 0 handler_check_orphan = Proc.new do |resp| check_orphan(resp.handles, lambda { logger.info("Check orphan is requested") }, lambda { |errmsg| logger.error("Error on requesting to check orphan #{errmsg}") }) end event_machine.add_periodic_timer(@check_orphan_interval) { fetch_handles(&handler_check_orphan) } end # Register update handle callback @provisioner.register_update_handle_callback { |handle, &blk| update_service_handle(handle, &blk) } end |
#validate_incoming_request ⇒ Object
Validate the incoming request
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/base/asynchronous_service_gateway.rb', line 113 def validate_incoming_request unless request.media_type == Rack::Mime.mime_type('.json') error_msg = ServiceError.new(ServiceError::INVALID_CONTENT).to_hash logger.error("Validation failure: #{error_msg.inspect}, request media type: #{request.media_type} is not json") abort_request(error_msg) end unless auth_token && (auth_token == @token) error_msg = ServiceError.new(ServiceError::NOT_AUTHORIZED).to_hash logger.error("Validation failure: #{error_msg.inspect}, expected token: #{@token}, specified token: #{auth_token}") abort_request(error_msg) end unless @handle_fetched error_msg = ServiceError.new(ServiceError::SERVICE_UNAVAILABLE).to_hash logger.error("Validation failure: #{error_msg.inspect}, handles not fetched") abort_request(error_msg) end end |