Class: VCAP::Services::Base::Node

Inherits:
Base show all
Includes:
Internal
Defined in:
lib/base/node.rb

Instance Attribute Summary

Attributes inherited from Base

#closing

Instance Method Summary collapse

Methods inherited from Base

#group_handles_in_json, #publish, #service_description, #shutdown, #update_varz

Methods included from Error

#failure, #internal_fail, #parse_msg, #success, #timeout_fail

Constructor Details

#initialize(options) ⇒ Node

Returns a new instance of Node.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/base/node.rb', line 15

def initialize(options)
  super(options)
  @node_id = options[:node_id]
  @plan = options[:plan]
  @capacity = options[:capacity]
  @max_capacity = @capacity
  @capacity_lock = Mutex.new
  @migration_nfs = options[:migration_nfs]
  @fqdn_hosts = options[:fqdn_hosts]
  @op_time_limit = options[:op_time_limit]
  @disabled_file = options[:disabled_file]
  DataMapper::initialize_lock_file(options[:database_lock_file]) if options[:database_lock_file]

  # A default supported version
  # *NOTE: All services *MUST* override this to provide the actual supported versions
  @supported_versions = options[:supported_versions] || []
  z_interval = options[:z_interval] || 30
  EM.add_periodic_timer(z_interval) do
    EM.defer { update_varz }
  end if @node_nats

  # Defer 5 seconds to give service a change to wake up
  EM.add_timer(5) do
    EM.defer { update_varz }
  end if @node_nats
end

Instance Method Details

#all_bindings_listObject

Subclass must overwrite this method to enable check orphan binding feature. Otherwise it will not check orphan bindings The return value should be a list of binding credentials Binding credential will be the argument for unbind method And it should have at least username & name property for base code to find the orphans



345
346
347
# File 'lib/base/node.rb', line 345

def all_bindings_list
  []
end

#all_instances_listObject

Subclass must overwrite this method to enable check orphan instance feature. Otherwise it will not check orphan instance The return value should be a list of instance name(handle).



335
336
337
# File 'lib/base/node.rb', line 335

def all_instances_list
  []
end

#capacity_unitObject



423
424
425
426
427
# File 'lib/base/node.rb', line 423

def capacity_unit
  # subclasses could overwrite this method to re-define
  # the capacity unit decreased/increased by provision/unprovision
  1
end

#disabled?Boolean

Returns:

  • (Boolean)


382
383
384
# File 'lib/base/node.rb', line 382

def disabled?
  File.exist?(@disabled_file)
end

#encode_failure(response, error = nil) ⇒ Object



435
436
437
438
439
440
441
442
# File 'lib/base/node.rb', line 435

def encode_failure(response, error=nil)
  response.success = false
  if error.nil? || !error.is_a?(ServiceError)
    error = ServiceError.new(ServiceError::INTERNAL_ERROR)
  end
  response.error = error.to_hash
  response.encode
end

#encode_success(response) ⇒ Object

Helper



430
431
432
433
# File 'lib/base/node.rb', line 430

def encode_success(response)
  response.success = true
  response.encode
end

#flavorObject



42
43
44
# File 'lib/base/node.rb', line 42

def flavor
  return "Node"
end

#get_all_bindings(handles) ⇒ Object



354
355
356
357
358
359
360
# File 'lib/base/node.rb', line 354

def get_all_bindings(handles)
  binding_creds = []
  handles.each do |handle|
    binding_creds << handle["credentials"]
  end
  binding_creds
end

#get_all_bindings_with_option(handles) ⇒ Object



362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/base/node.rb', line 362

def get_all_bindings_with_option(handles)
  binding_creds_hash = {}
  handles.each do |handle|
    value = {
      "credentials" => handle["credentials"],
      "binding_options" => nil
    }
    value["binding_options"] = handle["configuration"]["data"]["binding_options"] if handle["configuration"].has_key?("data")
    binding_creds_hash[handle["service_id"]] = value
  end
  binding_creds_hash
end

#get_hostObject



444
445
446
# File 'lib/base/node.rb', line 444

def get_host
  @fqdn_hosts ? Socket.gethostname : @local_ip
end

#get_migration_folder(instance) ⇒ Object

Get the tmp folder for migration



350
351
352
# File 'lib/base/node.rb', line 350

def get_migration_folder(instance)
  File.join(@migration_nfs, 'migration', service_name, instance)
end

#node_ready?Boolean

Returns:

  • (Boolean)


409
410
411
412
413
414
# File 'lib/base/node.rb', line 409

def node_ready?()
  # Service Node subclasses can override this method if they depend
  # on some external service in order to operate; for example, MySQL
  # and Postgresql require a connection to the underlying server.
  true
end

#on_bind(msg, reply) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/base/node.rb', line 125

def on_bind(msg, reply)
  @logger.debug("#{service_description}: Bind request: #{msg} from #{reply}")
  response = BindResponse.new
  rollback = lambda do |res|
    @logger.error("#{service_description}: Binding takes too long. Rollback for #{res.inspect}")
    unbind(res.credentials)
  end

  timing_exec(@op_time_limit, rollback) do
    bind_message = BindRequest.decode(msg)
    name      = bind_message.name
    bind_opts = bind_message.bind_opts
    credentials = bind_message.credentials
    response.credentials = bind(name, bind_opts, credentials)
    response
  end
  publish(reply, encode_success(response))
rescue => e
  @logger.warn("Exception at on_bind #{e}")
  publish(reply, encode_failure(response, e))
end

#on_check_orphan(msg, reply) ⇒ Object

Send all handles to gateway to check orphan



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/base/node.rb', line 287

def on_check_orphan(msg, reply)
  @logger.debug("#{service_description}: Request to check orphan")
  live_ins_list = all_instances_list
  live_bind_list = all_bindings_list
  handles_size = @max_nats_payload - 200

  group_handles_in_json(live_ins_list, live_bind_list, handles_size) do |ins_list, bind_list|
    request = NodeHandlesReport.new
    request.instances_list = ins_list
    request.bindings_list = bind_list
    request.node_id = @node_id
    publish("#{service_name}.node_handles", request.encode)
  end
rescue => e
  @logger.warn("Exception at on_check_orphan #{e}")
end

#on_cleanupnfs_instance(msg, reply) ⇒ Object

Cleanup nfs folder which contains migration data



274
275
276
277
278
279
280
281
282
283
284
# File 'lib/base/node.rb', line 274

def on_cleanupnfs_instance(msg, reply)
  @logger.debug("#{service_description}: cleanup nfs request #{msg} from #{reply}")
  response = SimpleResponse.new
  request = Yajl::Parser.parse(msg)
  prov_handle, _ = request
  FileUtils.rm_rf(get_migration_folder(prov_handle["service_id"]))
  publish(reply, encode_success(response))
rescue => e
  @logger.warn("Exception at on_cleanupnfs_instance #{e}")
  publish(reply, encode_failure(response, e))
end

#on_connect_nodeObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/base/node.rb', line 46

def on_connect_node
  @logger.debug("#{service_description}: Connected to node mbus")

  %w[provision unprovision bind unbind restore disable_instance
    enable_instance import_instance update_instance cleanupnfs_instance purge_orphan
  ].each do |op|
    eval %[@node_nats.subscribe("#{service_name}.#{op}.#{@node_id}") { |msg, reply| EM.defer{ on_#{op}(msg, reply) } }]
  end
  %w[discover check_orphan].each do |op|
    eval %[@node_nats.subscribe("#{service_name}.#{op}") { |msg, reply| EM.defer{ on_#{op}(msg, reply) } }]
  end

  pre_send_announcement
  send_node_announcement
  EM.add_periodic_timer(30) { send_node_announcement }
end

#on_disable_instance(msg, reply) ⇒ Object

Disable and dump instance



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/base/node.rb', line 180

def on_disable_instance(msg, reply)
  @logger.debug("#{service_description}: Disable instance #{msg} request from #{reply}")
  response = SimpleResponse.new
  request = Yajl::Parser.parse(msg)
  prov_handle, binding_handles = request
  prov_cred = prov_handle["credentials"]
  binding_creds = get_all_bindings(binding_handles)
  file_path = get_migration_folder(prov_handle["service_id"])
  FileUtils.mkdir_p(file_path)
  result = disable_instance(prov_cred, binding_creds)
  if result
    # Do dump together with disable for simpler migration logic
    result = dump_instance(prov_cred, binding_creds, file_path)
    if result
      publish(reply, encode_success(response))
    else
      publish(reply, encode_failure(response))
    end
  else
    publish(reply, encode_failure(response))
  end
rescue => e
  @logger.warn("Exception at on_disable_instance #{e}")
  publish(reply, encode_failure(response, e))
end

#on_discover(msg, reply) ⇒ Object



375
376
377
# File 'lib/base/node.rb', line 375

def on_discover(msg, reply)
  send_node_announcement(msg, reply)
end

#on_enable_instance(msg, reply) ⇒ Object

Enable instance, the opposite operation of disable



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/base/node.rb', line 207

def on_enable_instance(msg, reply)
  @logger.debug("#{service_description}: enable instance #{msg} request from #{reply}")
  response = SimpleResponse.new
  request = Yajl::Parser.parse(msg)
  prov_handle, binding_handles = request
  prov_cred = prov_handle["credentials"]
  binding_creds_hash = get_all_bindings_with_option(binding_handles)
  result = enable_instance(prov_cred, binding_creds_hash)
  if result
    publish(reply, encode_success(response))
  else
    publish(reply, encode_failure(response))
  end
rescue => e
  @logger.warn("Exception at on_enable_instance #{e}")
  publish(reply, encode_failure(response, e))
end

#on_import_instance(msg, reply) ⇒ Object

Import the generated data



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/base/node.rb', line 226

def on_import_instance(msg, reply)
  @logger.debug("#{service_description}: import instance #{msg} request from #{reply}")
  response = SimpleResponse.new
  request = Yajl::Parser.parse(msg)
  prov_handle, binding_handles = request
  prov_cred = prov_handle["credentials"]
  binding_creds_hash = get_all_bindings_with_option(binding_handles)
  plan = prov_handle["configuration"]["plan"]
  file_path = get_migration_folder(prov_handle["service_id"])
  result = import_instance(prov_cred, binding_creds_hash, file_path, plan)
  if result
    publish(reply, encode_success(response))
  else
    publish(reply, encode_failure(response))
  end
rescue => e
  @logger.warn("Exception at on_import_instance #{e}")
  publish(reply, encode_failure(response, e))
end

#on_provision(msg, reply) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/base/node.rb', line 76

def on_provision(msg, reply)
  @logger.debug("#{service_description}: Provision request: #{msg} from #{reply}")
  response = ProvisionResponse.new
  rollback = lambda do |res|
    @logger.error("#{service_description}: Provision takes too long. Rollback for #{res.inspect}")
    @capacity_lock.synchronize{ @capacity += capacity_unit } if unprovision(res.credentials["name"], [])
  end

  timing_exec(@op_time_limit, rollback) do
    provision_req = ProvisionRequest.decode(msg)
    plan = provision_req.plan
    credentials = provision_req.credentials
    version = provision_req.version
    @logger.debug("#{service_description}: Provision Request Details - plan=#{plan}, credentials=#{credentials}, version=#{version}")
    credential = provision(plan, credentials, version)
    credential['node_id'] = @node_id
    response.credentials = credential
    @capacity_lock.synchronize{ @capacity -= capacity_unit }
    @logger.debug("#{service_description}: Successfully provisioned service for request #{msg}: #{response.inspect}")
    response
  end
  publish(reply, encode_success(response))
rescue => e
  @logger.warn("Exception at on_provision #{e}")
  publish(reply, encode_failure(response, e))
end

#on_purge_orphan(msg, reply) ⇒ Object



304
305
306
307
308
309
310
# File 'lib/base/node.rb', line 304

def on_purge_orphan(msg, reply)
  @logger.debug("#{service_description}: Request to purge orphan" )
  request = PurgeOrphanRequest.decode(msg)
  purge_orphan(request.orphan_ins_list,request.orphan_binding_list)
rescue => e
  @logger.warn("Exception at on_purge_orphan #{e}")
end

#on_restore(msg, reply) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/base/node.rb', line 162

def on_restore(msg, reply)
  @logger.debug("#{service_description}: Restore request: #{msg} from #{reply}")
  response = SimpleResponse.new
  restore_message = RestoreRequest.decode(msg)
  instance_id = restore_message.instance_id
  backup_path = restore_message.backup_path
  result = restore(instance_id, backup_path)
  if result
    publish(reply, encode_success(response))
  else
    publish(reply, encode_failure(response))
  end
rescue => e
  @logger.warn("Exception at on_restore #{e}")
  publish(reply, encode_failure(response, e))
end

#on_unbind(msg, reply) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/base/node.rb', line 147

def on_unbind(msg, reply)
  @logger.debug("#{service_description}: Unbind request: #{msg} from #{reply}")
  response = SimpleResponse.new
  unbind_req = UnbindRequest.decode(msg)
  result = unbind(unbind_req.credentials)
  if result
    publish(reply, encode_success(response))
  else
    publish(reply, encode_failure(response))
  end
rescue => e
  @logger.warn("Exception at on_unbind #{e}")
  publish(reply, encode_failure(response, e))
end

#on_unprovision(msg, reply) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/base/node.rb', line 103

def on_unprovision(msg, reply)
  @logger.debug("#{service_description}: Unprovision request: #{msg}.")
  response = SimpleResponse.new
  unprovision_req = UnprovisionRequest.decode(msg)
  name     = unprovision_req.name
  bindings = unprovision_req.bindings
  result = unprovision(name, bindings)
  if result
    publish(reply, encode_success(response))
    @capacity_lock.synchronize{ @capacity += capacity_unit }
  else
    publish(reply, encode_failure(response))
  end
rescue => e
  @logger.warn("Exception at on_unprovision #{e}")
  if e.http_status == ServiceError::HTTP_NOT_FOUND
    publish(reply, encode_success(response))
  else
    publish(reply, encode_failure(response, e))
  end
end

#on_update_instance(msg, reply) ⇒ Object

Update credentials in destination node of migration



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/base/node.rb', line 247

def on_update_instance(msg, reply)
  @logger.debug("#{service_description}: update instance #{msg} request from #{reply}")
  request = Yajl::Parser.parse(msg)
  prov_handle, binding_handles = request
  prov_cred = prov_handle["credentials"]
  binding_creds_hash = get_all_bindings_with_option(binding_handles)
  result = update_instance(prov_cred, binding_creds_hash)
  # Need decrease the capacity in destination node when finish migration
  @capacity_lock.synchronize{ @capacity -= capacity_unit }
  prov_cred, binding_creds_hash = result
  # Update node_id in provision credentials
  prov_cred["node_id"] = @node_id
  handles = []
  prov_handle["credentials"] = prov_cred
  handles << prov_handle
  binding_handles.each do |handle|
    handle["credentials"] = binding_creds_hash[handle["service_id"]]["credentials"]
    handles << handle
  end
  publish(reply, Yajl::Encoder.encode(handles))
rescue => e
  @logger.warn("Exception at on_update_instance #{e}")
  response = SimpleResponse.new
  publish(reply, encode_failure(response, e))
end

#pre_send_announcementObject



379
380
# File 'lib/base/node.rb', line 379

def pre_send_announcement
end

#purge_orphan(oi_list, ob_list) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/base/node.rb', line 312

def purge_orphan(oi_list,ob_list)
  oi_list.each do |ins|
    begin
      @logger.debug("Unprovision orphan instance #{ins}")
      @capacity_lock.synchronize{ @capacity += capacity_unit } if unprovision(ins,[])
    rescue => e
      @logger.debug("Error on purge orphan instance #{ins}: #{e}")
    end
  end

  ob_list.each do |credential|
    begin
      @logger.debug("Unbind orphan binding #{credential}")
      unbind(credential)
    rescue => e
      @logger.debug("Error on purge orphan binding #{credential}: #{e}")
    end
  end
end

#send_node_announcement(msg = nil, reply = nil) ⇒ Object



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/base/node.rb', line 386

def send_node_announcement(msg=nil, reply=nil)
  if disabled?
    @logger.info("#{service_description}: Not sending announcement because node is disabled")
    return
  end
  unless node_ready?
    @logger.debug("#{service_description}: Not ready to send announcement")
    return
  end
  @logger.debug("#{service_description}: Sending announcement for #{reply || "everyone"}")
  req = nil
  req = Yajl::Parser.parse(msg) if msg
  if !req || req["plan"] == @plan
    a = announcement
    a[:id] = @node_id
    a[:plan] = @plan
    a[:supported_versions] = @supported_versions
    publish(reply || "#{service_name}.announce", Yajl::Encoder.encode(a))
  end
rescue => e
  @logger.warn("Exception at send_node_announcement #{e}")
end

#timing_exec(time_limit, rollback = nil) ⇒ Object

raise an error if operation does not finish in time limit and perform a rollback action if rollback function is provided



65
66
67
68
69
70
71
72
73
74
# File 'lib/base/node.rb', line 65

def timing_exec(time_limit, rollback=nil)
  return unless block_given?

  start = Time.now
  response = yield
  if response && Time.now - start > time_limit
    rollback.call(response) if rollback
    raise ServiceError::new(ServiceError::NODE_OPERATION_TIMEOUT)
  end
end

#varz_detailsObject



416
417
418
419
420
421
# File 'lib/base/node.rb', line 416

def varz_details
  # Service Node subclasses may want to override this method to
  # provide service specific data beyond what is returned by their
  # "announcement" method.
  return announcement
end