Class: OmfRc::ResourceProxy::AbstractResource

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, OmfRc::ResourceProxyDSL
Defined in:
lib/omf_rc/resource_proxy/abstract_resource.rb

Overview

Note:

Suppose you have read the DEVELOPERS GUIDE which explains the basic the resource controller system.

This is the abstract resource proxy class, which provides the base of all proxy implementations. When creating new resource instances, this abstract class will always be initialised first and then extended by one of the specific resource proxy modules.

Instead of initialise abstract resource directly, use Resource Factory‘s methods.

Proxy documentation has grouped FRCP API methods for your convenience.

We follow a simple naming convention for request/configure properties.

request_xxx() indicates property 'xxx' can be requested using FRCP REQUEST message.

configure_xxx(value) indicates property 'xxx' can be configured with 'value' using FRCP CONFIGURE message.

Currently official OMF RC gem contains following resource proxies:

Representing physical/virtual machine

Executing OML enabled application and monitor output

Configuring network interfaces

Installing packages

Creating virtual machines

Examples:

Creating resource using factory method

OmfRc::ResourceFactory.create(:node, uid: 'node01')

See Also:

Constant Summary collapse

RELEASE_WAIT =

Time to wait before releasing resource, wait for deleting pubsub topics

5
DEFAULT_CREATION_OPTS =
{
  suppress_create_message: false,
  create_children_resources: true
}

Constants included from OmfRc::ResourceProxyDSL

OmfRc::ResourceProxyDSL::DEFAULT_PROP_ACCESS, OmfRc::ResourceProxyDSL::PROXY_DIR, OmfRc::ResourceProxyDSL::UTIL_DIR

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from OmfRc::ResourceProxyDSL

#call_hook, included

Constructor Details

#initialize(type, opts = {}, creation_opts = {}, &creation_callback) ⇒ AbstractResource

Initialisation

Parameters:

  • type (Symbol)

    resource proxy type

  • opts (Hash) (defaults to: {})

    options to be initialised

  • creation_opts (Hash) (defaults to: {})

    options to control the resource creation process

Options Hash (opts):

  • :uid (String)

    Unique identifier

  • :hrn (String)

    Human readable name

  • :property (Hash)

    A hash for keeping internal state

  • :instrument (Hash)

    A hash for keeping instrumentation-related state

  • :certificate (OmfCommon::Auth::Certificate)

    The certificate for this resource

Options Hash (creation_opts):

  • :suppress_create_message (Boolean)

    Don’t send an initial CREATION.OK Inform message

  • :create_children_resources (Boolean)

    Immediately create ‘known’ children resources, such as interfaces on nodes



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 100

def initialize(type, opts = {}, creation_opts = {}, &creation_callback)
  super()

  @opts = Hashie::Mash.new(opts)
  @creation_opts = Hashie::Mash.new(DEFAULT_CREATION_OPTS.merge(creation_opts))

  @type = type
  @uid = (@opts.uid || SecureRandom.uuid).to_s
  @hrn = @opts.hrn && @opts.hrn.to_s

  @children ||= []
  @membership = []
  @topics = []
  @membership_topics ||= {}

  @property = Hashie::Mash.new(@opts.property)

  OmfCommon.comm.subscribe(@uid) do |t|
    @topics << t

    if t.error?
      warn "Could not create topic '#{uid}', will shutdown, trying to clean up old topics. Please start it again once it has been shutdown."
      OmfCommon.comm.disconnect()
    else
      if (@certificate = @opts.certificate)
        OmfCommon::Auth::CertificateStore.instance.register(@certificate, t.address)
      else
        if (pcert = @opts.parent_certificate)
          @certificate = pcert.create_for(resource_address, @type, t.address)
        end
      end

      creation_callback.call(self) if creation_callback

      copts = { src: self.resource_address }
      copts[:cert] = @certificate.to_pem_compact if @certificate

      cprops = @property.reject { |k| [:parent_certificate, :parent].include?(k.to_sym) }
      cprops[:res_id] = self.resource_address

      t.inform(:creation_ok, cprops, copts)

      t.on_message(@uid) do |imsg|
        process_omf_message(imsg, t)
      end
    end
  end
  configure_membership(@opts.membership) if @opts.membership
end

Instance Attribute Details

#certificateObject

Returns the value of attribute certificate.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def certificate
  @certificate
end

#childrenObject (readonly)

Returns the value of attribute children.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def children
  @children
end

#commObject

Returns the value of attribute comm.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def comm
  @comm
end

#creation_optsObject (readonly)

Returns the value of attribute creation_opts.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def creation_opts
  @creation_opts
end

#hrnObject Also known as: name

Returns the value of attribute hrn.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def hrn
  @hrn
end

#membershipObject (readonly)

Returns the value of attribute membership.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def membership
  @membership
end

#membership_topicsObject (readonly)

Returns the value of attribute membership_topics.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def membership_topics
  @membership_topics
end

#optsObject (readonly)

Returns the value of attribute opts.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def opts
  @opts
end

#propertyObject

Returns the value of attribute property.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def property
  @property
end

#typeObject

Returns the value of attribute type.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def type
  @type
end

#uidObject

Returns the value of attribute uid.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def uid
  @uid
end

Instance Method Details

#configure_membership(*args) ⇒ Object

Make resource part of the group topic, it will overwrite existing membership array

Parameters:

  • args (String|Array)

    name of group topic/topics



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 321

def configure_membership(*args)
  new_membership = [args[0]].flatten

  new_membership.each do |new_m|
    unless @membership.include?(new_m)
      OmfCommon.comm.subscribe(new_m) do |t|
        if t.error?
          warn "Group #{new_m} disappeared"
          #EM.next_tick do
          #  @membership.delete(m)
          #end
        else
          self.synchronize do
            @membership << new_m
            @membership_topics[new_m] = t
            self.inform(:status, { membership: @membership }, t)
          end

          t.on_message(@uid) do |imsg|
            process_omf_message(imsg, t)
          end
        end
      end
    end
  end
  @membership
end

#create(type, opts = {}, creation_opts = {}, &creation_callback) ⇒ AbstractResource

Create a new resource in the context of this resource. This resource becomes parent, and newly created resource becomes child

Parameters:

  • type (Symbol)

    resource proxy type

  • opts (Hash) (defaults to: {})

    options to be initialised

  • creation_opts (Hash) (defaults to: {})

    options to control the resource creation process

Returns:



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 174

def create(type, opts = {}, creation_opts = {}, &creation_callback)
  unless request_supported_children_type.include?(type.to_sym)
    raise StandardError, "Resource #{type} is not designed to be created by #{self.type}"
  end

  opts[:parent_certificate] = @certificate if @certificate
  opts[:parent] = self

  call_hook(:before_create, self, type, opts)

  new_resource = OmfRc::ResourceFactory.create(type.to_sym, opts, creation_opts, &creation_callback)

  new_props = opts.reject { |k| [:type, :name, :uid, :hrn, :property, :instrument].include?(k.to_sym) }
  init_configure(new_resource, new_props)

  call_hook(:after_create, self, new_resource)

  self.synchronize do
    children << new_resource
  end
  new_resource
end

#disconnectObject

Disconnect using communicator



166
167
168
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 166

def disconnect
  OmfCommon.comm.disconnect
end

#execute_omf_operation(message, obj, topic) ⇒ Object

Execute operation based on the type of the message

Parameters:



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 379

def execute_omf_operation(message, obj, topic)
  begin
    response_h = handle_message(message, obj)
  rescue  => ex
    err_resp = message.create_inform_reply_message(nil, {}, src: resource_address)
    err_resp[:reason] = ex.to_s
    error "Encountered exception, returning ERROR message"
    debug ex.message
    debug ex.backtrace.join("\n")
    return inform(:error, err_resp, topic)
  end

  case message.operation
  #when :create
  #  inform(:creation_ok, response_h, topic)
  when :request, :configure
    inform(:status, response_h, topic)
  when :release
    OmfCommon.eventloop.after(RELEASE_WAIT) do
      inform(:released, response_h, topic) if response_h[:res_id]
    end
  end
end

#get_bindingObject

Get binding of current object, used for ERB eval



161
162
163
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 161

def get_binding
  binding
end

#handle_configure_message(message, obj, response) ⇒ Object

FRCP CONFIGURE message handler

Parameters:



474
475
476
477
478
479
480
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 474

def handle_configure_message(message, obj, response)
  message.each_property do |key, value|
    method_name =  "#{message.operation.to_s}_#{key}"
    p_value = message[key]
    response[key] ||= obj.__send__(method_name, p_value)
  end
end

#handle_create_message(message, obj, response) ⇒ Object

FRCP CREATE message handler

Parameters:



436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 436

def handle_create_message(message, obj, response)
  new_name = message[:name] || message[:hrn]
  msg_props = message.properties.merge({ hrn: new_name })

  obj.create(message[:type], msg_props, &lambda do |new_obj|
    begin
      response[:res_id] = new_obj.resource_address
      response[:uid] = new_obj.uid

      msg_props.each do |key, value|
        if new_obj.respond_to? "request_#{key}"
          response[key] = new_obj.__send__("request_#{key}")
        elsif new_obj.respond_to? key
          response[key] = new_obj.__send__(key)
        end
      end

    if (cred = new_obj.certificate)
        response[:cert] = cred.to_pem_compact
      end
      # self here is the parent
      self.inform(:creation_ok, response)
    rescue  => ex
      err_resp = message.create_inform_reply_message(nil, {}, src: resource_address)
      err_resp[:reason] = ex.to_s
      error "Encountered exception, returning ERROR message"
      debug ex.message
      debug ex.backtrace.join("\n")
      return self.inform(:error, err_resp)
    end
  end)
end

#handle_message(message, obj) ⇒ Object

Handling all messages, then delegate them to individual handler

Parameters:



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 407

def handle_message(message, obj)
  response = message.create_inform_reply_message(nil, {}, src: resource_address)
  response.replyto replyto_address(obj, message.replyto)

  case message.operation
  when :create
    handle_create_message(message, obj, response)
  when :request
    handle_request_message(message, obj, response)
  when :configure
    handle_configure_message(message, obj, response)
  when :release
    handle_release_message(message, obj, response)
  when :inform
    nil # We really don't care about inform messages which created from here
  else
    raise StandardError, "          Invalid message received (Unknown OMF operation \#{message.operation}): \#{message}.\n          Please check protocol schema of version \#{OmfCommon::PROTOCOL_VERSION}.\n    ERROR\n  end\n  response\nend\n"

#handle_release_message(message, obj, response) ⇒ Object

FRCP RELEASE message handler

Parameters:



507
508
509
510
511
512
513
514
515
516
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 507

def handle_release_message(message, obj, response)
  res_id = message.res_id
  released_obj = obj.release(res_id)
  # TODO: Under what circumstances would 'realease_obj' be NIL
  #
  # When release message send to a group, for bulk releasing,
  # the proxy might not be aware of a res_id it received
  response[:res_id] = released_obj.resource_address if released_obj
  response
end

#handle_request_message(message, obj, response) ⇒ Object

FRCP REQUEST message handler

Parameters:



487
488
489
490
491
492
493
494
495
496
497
498
499
500
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 487

def handle_request_message(message, obj, response)
  request_props = if message.has_properties?
                    message.properties.keys.map(&:to_sym) & obj.request_available_properties.request
                  else
                    # Return ALL props when nothing specified
                    obj.request_available_properties.request
                  end

  request_props.each do |p_name|
    method_name = "request_#{p_name.to_s}"
    value = obj.__send__(method_name)
    response[p_name] = value if value
  end
end

#inform(itype, inform_data, topic = nil) ⇒ Object

Publish an inform message

Parameters:

  • itype (Symbol)

    the type of inform message

  • inform_data (Hash | Hashie::Mash | Exception | String)

    the type of inform message

  • topic (String) (defaults to: nil)

    Name of topic to send it. :ALL means to uid as well s all members



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 524

def inform(itype, inform_data, topic = nil)
  inform_data = inform_data.dup # better make a copy
  unless address = resource_address
    OmfCommon.eventloop.after(1) do
      # try again in a bit and see if address has been set by then
      inform(itype, inform_data, topic = nil)
    end
    warn "INFORM message delayed as resource's address is not known yet"
    return
  end
  if topic == :ALL
    inform(itype, inform_data)
    membership_topics.each {|m| inform(itype, inform_data, m[1])}
    return
  end

  topic ||= @topics.first
  if inform_data.is_a? Hash
    inform_data = Hashie::Mash.new(inform_data) if inform_data.class == Hash
    #idata = inform_data.dup
    idata = {
      src: address,
      type: self.type  # NOTE: Should we add the object's type as well???
    }
    message = OmfCommon::Message.create_inform_message(itype.to_s.upcase, inform_data, idata)
  else
    message = inform_data
  end

  message.itype = itype
  unless itype == :released
    #message[:uid] ||= self.uid
    #message[:type] ||= self.type
    message[:hrn] ||= self.hrn if self.hrn
  end

  topic.publish(message)

  OmfRc::ResourceProxy::MPPublished.inject(Time.now.to_f,
    self.uid, replyto, inform_message.mid) if OmfCommon::Measure.enabled?
end

#inform_error(reason) ⇒ Object



570
571
572
573
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 570

def inform_error(reason)
  error reason
  inform :error, {reason: reason}
end

#inform_status(props) ⇒ Object



566
567
568
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 566

def inform_status(props)
  inform :status, props
end

#inform_warn(reason) ⇒ Object



575
576
577
578
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 575

def inform_warn(reason)
  warn reason
  inform :warn, {reason: reason}
end

#process_omf_message(message, topic) ⇒ Object

Parse omf message and execute as instructed by the message

Parameters:

  • message (OmfCommon::Message)

    FRCP message

  • topic (OmfCommon::Comm::Topic)

    subscribed to



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 355

def process_omf_message(message, topic)
  return unless check_guard(message)

  unless message.is_a? OmfCommon::Message
    raise ArgumentError, "Expected OmfCommon::Message, but got '#{message.class}'"
  end

  unless message.valid?
    raise StandardError, "Invalid message received: #{pubsub_item_payload}. Please check protocol schema of version #{OmfCommon::PROTOCOL_VERSION}."
  end

  objects_by_topic(topic.id.to_s).each do |obj|
    if OmfCommon::Measure.enabled?
      OmfRc::ResourceProxy::MPReceived.inject(Time.now.to_f, self.uid, topic, message.mid)
    end
    execute_omf_operation(message, obj, topic)
  end
end

#release(res_id) ⇒ AbstractResource

Release a child resource

Returns:



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 200

def release(res_id)
  if (child = children.find { |v| v.uid.to_s == res_id.to_s })
    if child.release_self()
      self.synchronize do
        children.delete(child)
      end
      child
    else
      child = nil
    end
    debug "#{child.uid} released"
  else
    debug "#{res_id} does not belong to #{self.uid}(#{self.hrn}) - #{children.map(&:uid).inspect}"
  end
  child
end

#release_selfBoolean

Release this resource. Should ONLY be called by parent resource.

Returns:

  • (Boolean)

    true if successful



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 220

def release_self
  # Release children resource recursively
  children.each do |c|
    if c.release_self
      self.synchronize do
        children.delete(c)
      end
    end
  end

  return false unless children.empty?

  info "Releasing hrn: #{hrn}, uid: #{uid}"

  call_hook(:before_release, self)

  props = {
    res_id: resource_address
  }
  props[:hrn] = hrn if hrn
  inform :released, props

  # clean up topics
  @topics.each do |t|
    t.unsubscribe(@uid)
  end

  @membership_topics.each_value do |t|
    if t.respond_to? :delete_on_message_cbk_by_id
      t.delete_on_message_cbk_by_id(@uid)
    end
    t.unsubscribe(@uid)
  end

  true
end

#request_available_properties(*args) ⇒ Hashie::Mash

Return a list of all properties can be requested and configured

Examples:

{ request: [:ip_addr, :frequency], configure: [:ip_address] }

Returns:

  • (Hashie::Mash)


274
275
276
277
278
279
280
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 274

def request_available_properties(*args)
  Hashie::Mash.new(request: [], configure: []).tap do |mash|
    methods.each do |m|
      mash[$1] << $2.to_sym if m =~ /^(request|configure)_(.+)/ && $2 != "available_properties"
    end
  end
end

#request_child_resources(*args) ⇒ Hashie::Mash

Request child resources

Returns:

  • (Hashie::Mash)

    child resource mash with uid and hrn



309
310
311
312
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 309

def request_child_resources(*args)
  #children.map { |c| Hashie::Mash.new({ uid: c.uid, name: c.hrn }) }
  children.map { |c| c.to_hash }
end

#request_hrn(*args) ⇒ Object Also known as: request_name

Make hrn accessible through pubsub interface



293
294
295
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 293

def request_hrn(*args)
  hrn
end

#request_membership(*args) ⇒ Object

Query resource’s membership



302
303
304
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 302

def request_membership(*args)
  @membership
end

#request_supported_children_type(*args) ⇒ Array<Symbol>

Return a list of child resources this resource can create

Returns:

  • (Array<Symbol>)


262
263
264
265
266
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 262

def request_supported_children_type(*args)
  OmfRc::ResourceFactory.proxy_list.reject { |v| v == @type.to_s }.find_all do |k, v|
    (v.create_by && v.create_by.include?(@type.to_sym)) || v.create_by.nil?
  end.map(&:first).map(&:to_sym)
end

#request_type(*args) ⇒ Object

Make type accessible through pubsub interface



288
289
290
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 288

def request_type(*args)
  type
end

#request_uid(*args) ⇒ Object

Make uid accessible through pubsub interface



283
284
285
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 283

def request_uid(*args)
  uid
end

#resource_addressObject

Return the public ‘routable’ address for this resource or nil if not known yet.



152
153
154
155
156
157
158
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 152

def resource_address()
  if t = @topics[0]
    t.address
  else
    nil # TODO: should we raise Excaption
  end
end

#to_hashHash

Return a hash describing a reference to this object

Returns:

  • (Hash)


583
584
585
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 583

def to_hash
  { uid: @uid, address: resource_address() }
end