Class: VCAP::Services::AsynchronousServiceGateway

Inherits:
BaseAsynchronousServiceGateway show all
Defined in:
lib/base/asynchronous_service_gateway.rb

Constant Summary collapse

REQ_OPTS =
%w(service token provisioner cloud_controller_uri).map { |o| o.to_sym }

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Base::Error

#failure, #internal_fail, #parse_msg, #success, #timeout_fail

Constructor Details

#initialize(opts) ⇒ AsynchronousServiceGateway

Returns a new instance of AsynchronousServiceGateway.



17
18
19
# File 'lib/base/asynchronous_service_gateway.rb', line 17

def initialize(opts)
  super(opts)
end

Instance Attribute Details

#event_machineObject (readonly)

Returns the value of attribute event_machine.



15
16
17
# File 'lib/base/asynchronous_service_gateway.rb', line 15

def event_machine
  @event_machine
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/base/asynchronous_service_gateway.rb', line 15

def logger
  @logger
end

#serviceObject (readonly)

Returns the value of attribute service.



15
16
17
# File 'lib/base/asynchronous_service_gateway.rb', line 15

def service
  @service
end

Instance Method Details

#check_orphan(handles, callback, errback) ⇒ Object



101
102
103
104
105
106
107
108
109
110
# File 'lib/base/asynchronous_service_gateway.rb', line 101

def check_orphan(handles, callback, errback)
  @provisioner.check_orphan(handles) do |msg|
    if msg['success']
      callback.call
      event_machine.add_timer(@double_check_orphan_interval) { fetch_handles { |rs| @provisioner.double_check_orphan(rs.handles) } }
    else
      errback.call(msg['response'])
    end
  end
end

#get_current_catalogObject



97
98
99
# File 'lib/base/asynchronous_service_gateway.rb', line 97

def get_current_catalog
  GatewayServiceCatalog.new([service]).to_hash
end

#setup(opts) ⇒ Object

setup the environment

Raises:

  • (ArgumentError)


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/base/asynchronous_service_gateway.rb', line 22

def setup(opts)
  missing_opts = REQ_OPTS.select { |o| !opts.has_key? o }
  raise ArgumentError, "Missing options: #{missing_opts.join(', ')}" unless missing_opts.empty?
  @service = opts[:service]
  @token = opts[:token]
  @logger = opts[:logger] || make_logger()
  @cld_ctrl_uri = http_uri(opts[:cloud_controller_uri])
  @provisioner = opts[:provisioner]
  @hb_interval = opts[:heartbeat_interval] || 60
  @node_timeout = opts[:node_timeout]
  @handle_fetch_interval = opts[:handle_fetch_interval] || 1
  @check_orphan_interval = opts[:check_orphan_interval] || -1
  @double_check_orphan_interval = opts[:double_check_orphan_interval] || 300
  @handle_fetched = opts[:handle_fetched] || false
  @fetching_handles = false
  @version_aliases = (service[:version_aliases] ||= {})

  opts[:gateway_name] ||= "Service Gateway"

  @cc_api_version = opts[:cc_api_version] || "v1"
  if @cc_api_version == "v1"
    require 'catalog_manager_v1'
    @catalog_manager = VCAP::Services::CatalogManagerV1.new(opts)
  elsif @cc_api_version == "v2"
    require 'catalog_manager_v2'
    @catalog_manager = VCAP::Services::CatalogManagerV2.new(opts)
  else
    raise "Unknown cc_api_version: #{@cc_api_version}"
  end

  @event_machine = opts[:event_machine] || EM

  # Setup heartbeats and exit handlers
  event_machine.add_periodic_timer(@hb_interval) { send_heartbeat }
  event_machine.next_tick { send_heartbeat }
  Kernel.at_exit do
    if event_machine.reactor_running?
      # :/ We can't stop others from killing the event-loop here. Let's hope that they play nice
      send_deactivation_notice(false)
    else
      event_machine.run { send_deactivation_notice }
    end
  end

  # Add any necessary handles we don't know about
  update_callback = Proc.new do |resp|
    @provisioner.update_handles(resp.handles)
    @handle_fetched = true
    event_machine.cancel_timer(@fetch_handle_timer)

    # TODO remove it when we finish the migration
    current_version = @version_aliases && @version_aliases[:current]
    if current_version
      @provisioner.update_version_info(current_version)
    else
      logger.info("No current version alias is supplied, skip update version in CCDB.")
    end
  end

  @fetch_handle_timer = event_machine.add_periodic_timer(@handle_fetch_interval) { fetch_handles(&update_callback) }
  event_machine.next_tick { fetch_handles(&update_callback) }

  if @check_orphan_interval > 0
    handler_check_orphan = Proc.new do |resp|
      check_orphan(resp.handles,
                   lambda { logger.info("Check orphan is requested") },
                   lambda { |errmsg| logger.error("Error on requesting to check orphan #{errmsg}") })
    end
    event_machine.add_periodic_timer(@check_orphan_interval) { fetch_handles(&handler_check_orphan) }
  end

  # Register update handle callback
  @provisioner.register_update_handle_callback { |handle, &blk| update_service_handle(handle, &blk) }
end

#validate_incoming_requestObject

Validate the incoming request



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/base/asynchronous_service_gateway.rb', line 113

def validate_incoming_request
  unless request.media_type == Rack::Mime.mime_type('.json')
    error_msg = ServiceError.new(ServiceError::INVALID_CONTENT).to_hash
    logger.error("Validation failure: #{error_msg.inspect}, request media type: #{request.media_type} is not json")
    abort_request(error_msg)
  end
  unless auth_token && (auth_token == @token)
    error_msg = ServiceError.new(ServiceError::NOT_AUTHORIZED).to_hash
    logger.error("Validation failure: #{error_msg.inspect}, expected token: #{@token}, specified token: #{auth_token}")
    abort_request(error_msg)
  end
  unless @handle_fetched
    error_msg = ServiceError.new(ServiceError::SERVICE_UNAVAILABLE).to_hash
    logger.error("Validation failure: #{error_msg.inspect}, handles not fetched")
    abort_request(error_msg)
  end
end