Class: VCAP::Services::Base::Node
Instance Attribute Summary
Attributes inherited from Base
#closing
Instance Method Summary
collapse
-
#all_bindings_list ⇒ Object
Subclass must overwrite this method to enable check orphan binding feature.
-
#all_instances_list ⇒ Object
Subclass must overwrite this method to enable check orphan instance feature.
-
#capacity_unit ⇒ Object
-
#disabled? ⇒ Boolean
-
#encode_failure(response, error = nil) ⇒ Object
-
#encode_success(response) ⇒ Object
-
#flavor ⇒ Object
-
#get_all_bindings(handles) ⇒ Object
-
#get_all_bindings_with_option(handles) ⇒ Object
-
#get_host ⇒ Object
-
#get_migration_folder(instance) ⇒ Object
Get the tmp folder for migration.
-
#initialize(options) ⇒ Node
constructor
-
#node_ready? ⇒ Boolean
-
#on_bind(msg, reply) ⇒ Object
-
#on_check_orphan(msg, reply) ⇒ Object
Send all handles to gateway to check orphan.
-
#on_cleanupnfs_instance(msg, reply) ⇒ Object
Cleanup nfs folder which contains migration data.
-
#on_connect_node ⇒ Object
-
#on_disable_instance(msg, reply) ⇒ Object
Disable and dump instance.
-
#on_discover(msg, reply) ⇒ Object
-
#on_enable_instance(msg, reply) ⇒ Object
Enable instance, the opposite operation of disable.
-
#on_import_instance(msg, reply) ⇒ Object
Import the generated data.
-
#on_provision(msg, reply) ⇒ Object
-
#on_purge_orphan(msg, reply) ⇒ Object
-
#on_restore(msg, reply) ⇒ Object
-
#on_unbind(msg, reply) ⇒ Object
-
#on_unprovision(msg, reply) ⇒ Object
-
#on_update_instance(msg, reply) ⇒ Object
Update credentials in destination node of migration.
-
#pre_send_announcement ⇒ Object
-
#purge_orphan(oi_list, ob_list) ⇒ Object
-
#send_node_announcement(msg = nil, reply = nil) ⇒ Object
-
#timing_exec(time_limit, rollback = nil) ⇒ Object
raise an error if operation does not finish in time limit and perform a rollback action if rollback function is provided.
-
#varz_details ⇒ Object
Methods inherited from Base
#group_handles_in_json, #publish, #service_description, #shutdown, #update_varz
Methods included from Error
#failure, #internal_fail, #parse_msg, #success, #timeout_fail
Constructor Details
#initialize(options) ⇒ Node
Returns a new instance of Node.
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/base/node.rb', line 15
def initialize(options)
super(options)
@node_id = options[:node_id]
@plan = options[:plan]
@capacity = options[:capacity]
@max_capacity = @capacity
@capacity_lock = Mutex.new
@migration_nfs = options[:migration_nfs]
@fqdn_hosts = options[:fqdn_hosts]
@op_time_limit = options[:op_time_limit]
@disabled_file = options[:disabled_file]
DataMapper::initialize_lock_file(options[:database_lock_file]) if options[:database_lock_file]
@supported_versions = options[:supported_versions] || []
z_interval = options[:z_interval] || 30
EM.add_periodic_timer(z_interval) do
EM.defer { update_varz }
end if @node_nats
EM.add_timer(5) do
EM.defer { update_varz }
end if @node_nats
end
|
Instance Method Details
#all_bindings_list ⇒ Object
Subclass must overwrite this method to enable check orphan binding feature. Otherwise it will not check orphan bindings The return value should be a list of binding credentials Binding credential will be the argument for unbind method And it should have at least username & name property for base code to find the orphans
345
346
347
|
# File 'lib/base/node.rb', line 345
def all_bindings_list
[]
end
|
#all_instances_list ⇒ Object
Subclass must overwrite this method to enable check orphan instance feature. Otherwise it will not check orphan instance The return value should be a list of instance name(handle).
335
336
337
|
# File 'lib/base/node.rb', line 335
def all_instances_list
[]
end
|
#capacity_unit ⇒ Object
423
424
425
426
427
|
# File 'lib/base/node.rb', line 423
def capacity_unit
1
end
|
#disabled? ⇒ Boolean
382
383
384
|
# File 'lib/base/node.rb', line 382
def disabled?
File.exist?(@disabled_file)
end
|
#encode_failure(response, error = nil) ⇒ Object
435
436
437
438
439
440
441
442
|
# File 'lib/base/node.rb', line 435
def encode_failure(response, error=nil)
response.success = false
if error.nil? || !error.is_a?(ServiceError)
error = ServiceError.new(ServiceError::INTERNAL_ERROR)
end
response.error = error.to_hash
response.encode
end
|
#encode_success(response) ⇒ Object
430
431
432
433
|
# File 'lib/base/node.rb', line 430
def encode_success(response)
response.success = true
response.encode
end
|
42
43
44
|
# File 'lib/base/node.rb', line 42
def flavor
return "Node"
end
|
#get_all_bindings(handles) ⇒ Object
354
355
356
357
358
359
360
|
# File 'lib/base/node.rb', line 354
def get_all_bindings(handles)
binding_creds = []
handles.each do |handle|
binding_creds << handle["credentials"]
end
binding_creds
end
|
#get_all_bindings_with_option(handles) ⇒ Object
362
363
364
365
366
367
368
369
370
371
372
373
|
# File 'lib/base/node.rb', line 362
def get_all_bindings_with_option(handles)
binding_creds_hash = {}
handles.each do |handle|
value = {
"credentials" => handle["credentials"],
"binding_options" => nil
}
value["binding_options"] = handle["configuration"]["data"]["binding_options"] if handle["configuration"].has_key?("data")
binding_creds_hash[handle["service_id"]] = value
end
binding_creds_hash
end
|
444
445
446
|
# File 'lib/base/node.rb', line 444
def get_host
@fqdn_hosts ? Socket.gethostname : @local_ip
end
|
#get_migration_folder(instance) ⇒ Object
Get the tmp folder for migration
350
351
352
|
# File 'lib/base/node.rb', line 350
def get_migration_folder(instance)
File.join(@migration_nfs, 'migration', service_name, instance)
end
|
#node_ready? ⇒ Boolean
409
410
411
412
413
414
|
# File 'lib/base/node.rb', line 409
def node_ready?()
true
end
|
#on_bind(msg, reply) ⇒ Object
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# File 'lib/base/node.rb', line 125
def on_bind(msg, reply)
@logger.debug("#{service_description}: Bind request: #{msg} from #{reply}")
response = BindResponse.new
rollback = lambda do |res|
@logger.error("#{service_description}: Binding takes too long. Rollback for #{res.inspect}")
unbind(res.credentials)
end
timing_exec(@op_time_limit, rollback) do
bind_message = BindRequest.decode(msg)
name = bind_message.name
bind_opts = bind_message.bind_opts
credentials = bind_message.credentials
response.credentials = bind(name, bind_opts, credentials)
response
end
publish(reply, encode_success(response))
rescue => e
@logger.warn("Exception at on_bind #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_check_orphan(msg, reply) ⇒ Object
Send all handles to gateway to check orphan
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
|
# File 'lib/base/node.rb', line 287
def on_check_orphan(msg, reply)
@logger.debug("#{service_description}: Request to check orphan")
live_ins_list = all_instances_list
live_bind_list = all_bindings_list
handles_size = @max_nats_payload - 200
group_handles_in_json(live_ins_list, live_bind_list, handles_size) do |ins_list, bind_list|
request = NodeHandlesReport.new
request.instances_list = ins_list
request.bindings_list = bind_list
request.node_id = @node_id
publish("#{service_name}.node_handles", request.encode)
end
rescue => e
@logger.warn("Exception at on_check_orphan #{e}")
end
|
#on_cleanupnfs_instance(msg, reply) ⇒ Object
Cleanup nfs folder which contains migration data
274
275
276
277
278
279
280
281
282
283
284
|
# File 'lib/base/node.rb', line 274
def on_cleanupnfs_instance(msg, reply)
@logger.debug("#{service_description}: cleanup nfs request #{msg} from #{reply}")
response = SimpleResponse.new
request = Yajl::Parser.parse(msg)
prov_handle, _ = request
FileUtils.rm_rf(get_migration_folder(prov_handle["service_id"]))
publish(reply, encode_success(response))
rescue => e
@logger.warn("Exception at on_cleanupnfs_instance #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_connect_node ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/base/node.rb', line 46
def on_connect_node
@logger.debug("#{service_description}: Connected to node mbus")
%w[provision unprovision bind unbind restore disable_instance
enable_instance import_instance update_instance cleanupnfs_instance purge_orphan
].each do |op|
eval %[@node_nats.subscribe("#{service_name}.#{op}.#{@node_id}") { |msg, reply| EM.defer{ on_#{op}(msg, reply) } }]
end
%w[discover check_orphan].each do |op|
eval %[@node_nats.subscribe("#{service_name}.#{op}") { |msg, reply| EM.defer{ on_#{op}(msg, reply) } }]
end
pre_send_announcement
send_node_announcement
EM.add_periodic_timer(30) { send_node_announcement }
end
|
#on_disable_instance(msg, reply) ⇒ Object
Disable and dump instance
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/base/node.rb', line 180
def on_disable_instance(msg, reply)
@logger.debug("#{service_description}: Disable instance #{msg} request from #{reply}")
response = SimpleResponse.new
request = Yajl::Parser.parse(msg)
prov_handle, binding_handles = request
prov_cred = prov_handle["credentials"]
binding_creds = get_all_bindings(binding_handles)
file_path = get_migration_folder(prov_handle["service_id"])
FileUtils.mkdir_p(file_path)
result = disable_instance(prov_cred, binding_creds)
if result
result = dump_instance(prov_cred, binding_creds, file_path)
if result
publish(reply, encode_success(response))
else
publish(reply, encode_failure(response))
end
else
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn("Exception at on_disable_instance #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_discover(msg, reply) ⇒ Object
375
376
377
|
# File 'lib/base/node.rb', line 375
def on_discover(msg, reply)
send_node_announcement(msg, reply)
end
|
#on_enable_instance(msg, reply) ⇒ Object
Enable instance, the opposite operation of disable
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
# File 'lib/base/node.rb', line 207
def on_enable_instance(msg, reply)
@logger.debug("#{service_description}: enable instance #{msg} request from #{reply}")
response = SimpleResponse.new
request = Yajl::Parser.parse(msg)
prov_handle, binding_handles = request
prov_cred = prov_handle["credentials"]
binding_creds_hash = get_all_bindings_with_option(binding_handles)
result = enable_instance(prov_cred, binding_creds_hash)
if result
publish(reply, encode_success(response))
else
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn("Exception at on_enable_instance #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_import_instance(msg, reply) ⇒ Object
Import the generated data
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
# File 'lib/base/node.rb', line 226
def on_import_instance(msg, reply)
@logger.debug("#{service_description}: import instance #{msg} request from #{reply}")
response = SimpleResponse.new
request = Yajl::Parser.parse(msg)
prov_handle, binding_handles = request
prov_cred = prov_handle["credentials"]
binding_creds_hash = get_all_bindings_with_option(binding_handles)
plan = prov_handle["configuration"]["plan"]
file_path = get_migration_folder(prov_handle["service_id"])
result = import_instance(prov_cred, binding_creds_hash, file_path, plan)
if result
publish(reply, encode_success(response))
else
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn("Exception at on_import_instance #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_provision(msg, reply) ⇒ Object
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/base/node.rb', line 76
def on_provision(msg, reply)
@logger.debug("#{service_description}: Provision request: #{msg} from #{reply}")
response = ProvisionResponse.new
rollback = lambda do |res|
@logger.error("#{service_description}: Provision takes too long. Rollback for #{res.inspect}")
@capacity_lock.synchronize{ @capacity += capacity_unit } if unprovision(res.credentials["name"], [])
end
timing_exec(@op_time_limit, rollback) do
provision_req = ProvisionRequest.decode(msg)
plan = provision_req.plan
credentials = provision_req.credentials
version = provision_req.version
@logger.debug("#{service_description}: Provision Request Details - plan=#{plan}, credentials=#{credentials}, version=#{version}")
credential = provision(plan, credentials, version)
credential['node_id'] = @node_id
response.credentials = credential
@capacity_lock.synchronize{ @capacity -= capacity_unit }
@logger.debug("#{service_description}: Successfully provisioned service for request #{msg}: #{response.inspect}")
response
end
publish(reply, encode_success(response))
rescue => e
@logger.warn("Exception at on_provision #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_purge_orphan(msg, reply) ⇒ Object
304
305
306
307
308
309
310
|
# File 'lib/base/node.rb', line 304
def on_purge_orphan(msg, reply)
@logger.debug("#{service_description}: Request to purge orphan" )
request = PurgeOrphanRequest.decode(msg)
purge_orphan(request.orphan_ins_list,request.orphan_binding_list)
rescue => e
@logger.warn("Exception at on_purge_orphan #{e}")
end
|
#on_restore(msg, reply) ⇒ Object
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
# File 'lib/base/node.rb', line 162
def on_restore(msg, reply)
@logger.debug("#{service_description}: Restore request: #{msg} from #{reply}")
response = SimpleResponse.new
restore_message = RestoreRequest.decode(msg)
instance_id = restore_message.instance_id
backup_path = restore_message.backup_path
result = restore(instance_id, backup_path)
if result
publish(reply, encode_success(response))
else
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn("Exception at on_restore #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_unbind(msg, reply) ⇒ Object
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
# File 'lib/base/node.rb', line 147
def on_unbind(msg, reply)
@logger.debug("#{service_description}: Unbind request: #{msg} from #{reply}")
response = SimpleResponse.new
unbind_req = UnbindRequest.decode(msg)
result = unbind(unbind_req.credentials)
if result
publish(reply, encode_success(response))
else
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn("Exception at on_unbind #{e}")
publish(reply, encode_failure(response, e))
end
|
#on_unprovision(msg, reply) ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
# File 'lib/base/node.rb', line 103
def on_unprovision(msg, reply)
@logger.debug("#{service_description}: Unprovision request: #{msg}.")
response = SimpleResponse.new
unprovision_req = UnprovisionRequest.decode(msg)
name = unprovision_req.name
bindings = unprovision_req.bindings
result = unprovision(name, bindings)
if result
publish(reply, encode_success(response))
@capacity_lock.synchronize{ @capacity += capacity_unit }
else
publish(reply, encode_failure(response))
end
rescue => e
@logger.warn("Exception at on_unprovision #{e}")
if e.http_status == ServiceError::HTTP_NOT_FOUND
publish(reply, encode_success(response))
else
publish(reply, encode_failure(response, e))
end
end
|
#on_update_instance(msg, reply) ⇒ Object
Update credentials in destination node of migration
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
# File 'lib/base/node.rb', line 247
def on_update_instance(msg, reply)
@logger.debug("#{service_description}: update instance #{msg} request from #{reply}")
request = Yajl::Parser.parse(msg)
prov_handle, binding_handles = request
prov_cred = prov_handle["credentials"]
binding_creds_hash = get_all_bindings_with_option(binding_handles)
result = update_instance(prov_cred, binding_creds_hash)
@capacity_lock.synchronize{ @capacity -= capacity_unit }
prov_cred, binding_creds_hash = result
prov_cred["node_id"] = @node_id
handles = []
prov_handle["credentials"] = prov_cred
handles << prov_handle
binding_handles.each do |handle|
handle["credentials"] = binding_creds_hash[handle["service_id"]]["credentials"]
handles << handle
end
publish(reply, Yajl::Encoder.encode(handles))
rescue => e
@logger.warn("Exception at on_update_instance #{e}")
response = SimpleResponse.new
publish(reply, encode_failure(response, e))
end
|
#pre_send_announcement ⇒ Object
379
380
|
# File 'lib/base/node.rb', line 379
def pre_send_announcement
end
|
#purge_orphan(oi_list, ob_list) ⇒ Object
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
|
# File 'lib/base/node.rb', line 312
def purge_orphan(oi_list,ob_list)
oi_list.each do |ins|
begin
@logger.debug("Unprovision orphan instance #{ins}")
@capacity_lock.synchronize{ @capacity += capacity_unit } if unprovision(ins,[])
rescue => e
@logger.debug("Error on purge orphan instance #{ins}: #{e}")
end
end
ob_list.each do |credential|
begin
@logger.debug("Unbind orphan binding #{credential}")
unbind(credential)
rescue => e
@logger.debug("Error on purge orphan binding #{credential}: #{e}")
end
end
end
|
#send_node_announcement(msg = nil, reply = nil) ⇒ Object
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
|
# File 'lib/base/node.rb', line 386
def send_node_announcement(msg=nil, reply=nil)
if disabled?
@logger.info("#{service_description}: Not sending announcement because node is disabled")
return
end
unless node_ready?
@logger.debug("#{service_description}: Not ready to send announcement")
return
end
@logger.debug("#{service_description}: Sending announcement for #{reply || "everyone"}")
req = nil
req = Yajl::Parser.parse(msg) if msg
if !req || req["plan"] == @plan
a = announcement
a[:id] = @node_id
a[:plan] = @plan
a[:supported_versions] = @supported_versions
publish(reply || "#{service_name}.announce", Yajl::Encoder.encode(a))
end
rescue => e
@logger.warn("Exception at send_node_announcement #{e}")
end
|
#timing_exec(time_limit, rollback = nil) ⇒ Object
raise an error if operation does not finish in time limit and perform a rollback action if rollback function is provided
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/base/node.rb', line 65
def timing_exec(time_limit, rollback=nil)
return unless block_given?
start = Time.now
response = yield
if response && Time.now - start > time_limit
rollback.call(response) if rollback
raise ServiceError::new(ServiceError::NODE_OPERATION_TIMEOUT)
end
end
|
#varz_details ⇒ Object
416
417
418
419
420
421
|
# File 'lib/base/node.rb', line 416
def varz_details
return announcement
end
|