Class: OmfRc::ResourceProxy::AbstractResource

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, OmfRc::ResourceProxyDSL
Defined in:
lib/omf_rc/resource_proxy/abstract_resource.rb

Overview

Note:

Suppose you have read the DEVELOPERS GUIDE which explains the basic the resource controller system.

This is the abstract resource proxy class, which provides the base of all proxy implementations. When creating new resource instances, this abstract class will always be initialised first and then extended by one of the specific resource proxy modules.

Instead of initialise abstract resource directly, use Resource Factory‘s methods.

Proxy documentation has grouped FRCP API methods for your convenience.

We follow a simple naming convention for request/configure properties.

request_xxx() indicates property 'xxx' can be requested using FRCP REQUEST message.

configure_xxx(value) indicates property 'xxx' can be configured with 'value' using FRCP CONFIGURE message.

Currently official OMF RC gem contains following resource proxies:

Representing physical/virtual machine

Executing OML enabled application and monitor output

Configuring network interfaces

Installing packages

Creating virtual machines

Examples:

Creating resource using factory method

OmfRc::ResourceFactory.create(:node, uid: 'node01')

See Also:

Constant Summary collapse

RELEASE_WAIT =

Time to wait before releasing resource, wait for deleting pubsub topics

5
DEFAULT_CREATION_OPTS =
{
  suppress_create_message: false,
  create_children_resources: true
}

Constants included from OmfRc::ResourceProxyDSL

OmfRc::ResourceProxyDSL::DEFAULT_PROP_ACCESS, OmfRc::ResourceProxyDSL::PROXY_DIR, OmfRc::ResourceProxyDSL::UTIL_DIR

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from OmfRc::ResourceProxyDSL

#call_hook, included

Constructor Details

#initialize(type, opts = {}, creation_opts = {}, &creation_callback) ⇒ AbstractResource

Initialisation

Parameters:

  • type (Symbol)

    resource proxy type

  • opts (Hash) (defaults to: {})

    options to be initialised

  • creation_opts (Hash) (defaults to: {})

    options to control the resource creation process

Options Hash (opts):

  • :uid (String)

    Unique identifier

  • :hrn (String)

    Human readable name

  • :instrument (Hash)

    A hash for keeping instrumentation-related state

  • :certificate (OmfCommon::Auth::Certificate)

    The certificate for this resource

Options Hash (creation_opts):

  • :suppress_create_message (Boolean)

    Don’t send an initial CREATION.OK Inform message

  • :create_children_resources (Boolean)

    Immediately create ‘known’ children resources, such as interfaces on nodes



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 99

def initialize(type, opts = {}, creation_opts = {}, &creation_callback)
  super()

  @opts = Hashie::Mash.new(opts)
  @creation_opts = Hashie::Mash.new(DEFAULT_CREATION_OPTS.merge(creation_opts))

  @type = type
  @uid = (@opts.delete(:uid) || SecureRandom.uuid).to_s
  @hrn = @opts.delete(:hrn)
  @hrn = @hrn.to_s if @hrn

  @children = []
  @membership = []
  @topics = []
  @membership_topics = {}
  @property = Hashie::Mash.new

  OmfCommon.comm.subscribe(@uid) do |t|
    @topics << t

    if t.error?
      warn "Could not create topic '#{uid}', will shutdown, trying to clean up old topics. Please start it again once it has been shutdown."
      OmfCommon.comm.disconnect()
    else
      begin
        # Setup authentication related properties
        if (@certificate = @opts.delete(:certificate))
          OmfCommon::Auth::CertificateStore.instance.register(@certificate, t.address)
        else
          if (pcert = @opts.delete(:parent_certificate))
            @certificate = pcert.create_for(resource_address, @type, t.address)
          end
        end

        # Extend resource with Resource Module, can be obtained from Factory
        emodule = OmfRc::ResourceFactory.proxy_list[@type].proxy_module || "OmfRc::ResourceProxy::#{@type.camelize}".constantize
        self.extend(emodule)
        # Initiate property hash with default property values
        self.methods.each do |m|
          self.__send__(m) if m =~ /default_property_(.+)/
        end
        # Bootstrap initial configure, this should handle membership too
        init_configure(self, @opts)
        # Execute resource before_ready hook if any
        call_hook :before_ready, self

        # Prepare init :creation_ok message
        copts = { src: self.resource_address }
        copts[:cert] = @certificate.to_pem_compact if @certificate
        cprops = @property
        cprops[:res_id] = self.resource_address
        add_prop_status_to_response(self, @opts.keys, cprops)

        # Then send inform message to itself, with all resource options' current values.
        t.inform(:creation_ok, cprops, copts) unless creation_opts[:suppress_create_message]

        t.on_message(@uid) do |imsg|
          process_omf_message(imsg, t)
        end

        creation_callback.call(self) if creation_callback
      rescue => e
        error "Encountered exception: #{e.message}, returning ERROR message"
        debug e.backtrace.join("\n")
        t.inform(:creation_failed,
                 { reason: e.message },
                 { src: self.resource_address })
      end
    end
  end
end

Instance Attribute Details

#certificateObject

Returns the value of attribute certificate.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def certificate
  @certificate
end

#childrenObject (readonly)

Returns the value of attribute children.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def children
  @children
end

#creation_optsObject (readonly)

Returns the value of attribute creation_opts.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def creation_opts
  @creation_opts
end

#hrnObject Also known as: name

Returns the value of attribute hrn.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def hrn
  @hrn
end

#membershipObject (readonly)

Returns the value of attribute membership.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def membership
  @membership
end

#membership_topicsObject (readonly)

Returns the value of attribute membership_topics.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def membership_topics
  @membership_topics
end

#optsObject (readonly)

Returns the value of attribute opts.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def opts
  @opts
end

#propertyObject

Returns the value of attribute property.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def property
  @property
end

#topicsObject (readonly)

Returns the value of attribute topics.



83
84
85
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 83

def topics
  @topics
end

#typeObject

Returns the value of attribute type.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def type
  @type
end

#uidObject

Returns the value of attribute uid.



82
83
84
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 82

def uid
  @uid
end

Instance Method Details

#configure_membership(*args) ⇒ Object

Make resource part of the group topic, it will overwrite existing membership array

Parameters:

  • args (String|Array)

    name of group topic/topics



343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 343

def configure_membership(*args)
  new_membership = [args[0]].flatten

  new_membership.each do |new_m|
    unless @membership.include?(new_m)
      OmfCommon.comm.subscribe(new_m) do |t|
        if t.error?
          warn "Group #{new_m} disappeared"
          #EM.next_tick do
          #  @membership.delete(m)
          #end
        else
          self.synchronize do
            @membership << new_m
            @membership_topics[new_m] = t
            self.inform(:status, { membership: @membership }, t)
          end

          t.on_message(@uid) do |imsg|
            process_omf_message(imsg, t)
          end
        end
      end
    end
  end
  @membership
end

#create(type, opts = {}, creation_opts = {}, &creation_callback) ⇒ AbstractResource

Create a new resource in the context of this resource. This resource becomes parent, and newly created resource becomes child

Parameters:

  • type (Symbol)

    resource proxy type

  • opts (Hash) (defaults to: {})

    options to be initialised

  • creation_opts (Hash) (defaults to: {})

    options to control the resource creation process

Returns:



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 199

def create(type, opts = {}, creation_opts = {}, &creation_callback)
  unless request_supported_children_type.include?(type.to_sym)
    raise StandardError, "Resource #{type} is not designed to be created by #{self.type}"
  end

  opts[:parent_certificate] = @certificate if @certificate
  opts[:parent] = self

  call_hook(:before_create, self, type, opts)

  new_resource = OmfRc::ResourceFactory.create(type.to_sym, opts, creation_opts, &creation_callback)

  call_hook(:after_create, self, new_resource)

  self.synchronize do
    children << new_resource
  end
  new_resource
end

#disconnectObject

Disconnect using communicator



191
192
193
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 191

def disconnect
  OmfCommon.comm.disconnect
end

#execute_omf_operation(message, obj, topic) ⇒ Object

Execute operation based on the type of the message

Parameters:



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 401

def execute_omf_operation(message, obj, topic)
  begin
    response_h = handle_message(message, obj)
  rescue  => ex
    err_resp = message.create_inform_reply_message(nil, {}, src: resource_address)
    err_resp[:reason] = ex.to_s
    error "Encountered exception, returning ERROR message"
    debug ex.message
    debug ex.backtrace.join("\n")
    return inform(:error, err_resp, topic)
  end

  case message.operation
  #when :create
  #  inform(:creation_ok, response_h, topic)
  when :request, :configure
    inform(:status, response_h, topic)
  when :release
    OmfCommon.eventloop.after(RELEASE_WAIT) do
      inform(:released, response_h, topic) if response_h[:res_id]
    end
  end
end

#get_bindingObject

Get binding of current object, used for ERB eval



186
187
188
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 186

def get_binding
  binding
end

#handle_configure_message(message, obj, response) ⇒ Object

FRCP CONFIGURE message handler

Parameters:



491
492
493
494
495
496
497
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 491

def handle_configure_message(message, obj, response)
  message.each_property do |key, value|
    method_name =  "#{message.operation.to_s}_#{key}"
    p_value = message[key]
    response[key] ||= obj.__send__(method_name, p_value)
  end
end

#handle_create_message(message, obj, response) ⇒ Object

FRCP CREATE message handler

Parameters:



458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 458

def handle_create_message(message, obj, response)
  new_name = message[:name] || message[:hrn]
  msg_props = message.properties.merge({ hrn: new_name })

  obj.create(message[:type], msg_props, &lambda do |new_obj|
    begin
      response[:res_id] = new_obj.resource_address
      response[:uid] = new_obj.uid

      # Getting property status, for preparing inform msg
      add_prop_status_to_response(new_obj, msg_props.keys, response)

    if (cred = new_obj.certificate)
        response[:cert] = cred.to_pem_compact
      end
      # self here is the parent
      self.inform(:creation_ok, response)
    rescue  => ex
      err_resp = message.create_inform_reply_message(nil, {}, src: resource_address)
      err_resp[:reason] = ex.to_s
      error "Encountered exception, returning ERROR message"
      debug ex.message
      debug ex.backtrace.join("\n")
      return self.inform(:error, err_resp)
    end
  end)
end

#handle_message(message, obj) ⇒ Object

Handling all messages, then delegate them to individual handler

Parameters:



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 429

def handle_message(message, obj)
  response = message.create_inform_reply_message(nil, {}, src: resource_address)
  response.replyto replyto_address(obj, message.replyto)

  case message.operation
  when :create
    handle_create_message(message, obj, response)
  when :request
    handle_request_message(message, obj, response)
  when :configure
    handle_configure_message(message, obj, response)
  when :release
    handle_release_message(message, obj, response)
  when :inform
    nil # We really don't care about inform messages which created from here
  else
    raise StandardError, "          Invalid message received (Unknown OMF operation \#{message.operation}): \#{message}.\n          Please check protocol schema of version \#{OmfCommon::PROTOCOL_VERSION}.\n    ERROR\n  end\n  response\nend\n"

#handle_release_message(message, obj, response) ⇒ Object

FRCP RELEASE message handler

Parameters:



524
525
526
527
528
529
530
531
532
533
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 524

def handle_release_message(message, obj, response)
  res_id = message.res_id
  released_obj = obj.release(res_id)
  # TODO: Under what circumstances would 'realease_obj' be NIL
  #
  # When release message send to a group, for bulk releasing,
  # the proxy might not be aware of a res_id it received
  response[:res_id] = released_obj.resource_address if released_obj
  response
end

#handle_request_message(message, obj, response) ⇒ Object

FRCP REQUEST message handler

Parameters:



504
505
506
507
508
509
510
511
512
513
514
515
516
517
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 504

def handle_request_message(message, obj, response)
  request_props = if message.has_properties?
                    message.properties.keys.map(&:to_sym) & obj.request_available_properties.request
                  else
                    # Return ALL props when nothing specified
                    obj.request_available_properties.request
                  end

  request_props.each do |p_name|
    method_name = "request_#{p_name.to_s}"
    value = obj.__send__(method_name)
    response[p_name] = value if value
  end
end

#inform(itype, inform_data, topic = nil) ⇒ Object

Publish an inform message

Parameters:

  • itype (Symbol)

    the type of inform message

  • inform_data (Hash | Hashie::Mash | Exception | String)

    the type of inform message

  • topic (String) (defaults to: nil)

    Name of topic to send it. :ALL means to uid as well s all members



541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 541

def inform(itype, inform_data, topic = nil)
  inform_data = inform_data.dup # better make a copy
  unless address = resource_address
    OmfCommon.eventloop.after(1) do
      # try again in a bit and see if address has been set by then
      inform(itype, inform_data, topic = nil)
    end
    warn "INFORM message delayed as resource's address is not known yet"
    return
  end

  if inform_data.is_a? Hash
    inform_data = Hashie::Mash.new(inform_data) if inform_data.class == Hash
    #idata = inform_data.dup
    idata = {
      src: address,
      type: self.type  # NOTE: Should we add the object's type as well???
    }
    message = OmfCommon::Message.create_inform_message(itype.to_s.upcase, inform_data, idata)
  else
    message = inform_data
  end

  message.itype = itype
  unless itype == :released
    message[:hrn] ||= self.hrn if self.hrn
  end

  # Just send to all topics, including group membership
  (membership_topics.map { |mt| mt[1] } + @topics).each { |t| t.publish(message) }

  OmfRc::ResourceProxy::MPPublished.inject(Time.now.to_f,
    self.uid, replyto, inform_message.mid) if OmfCommon::Measure.enabled?
end

#inform_creation_failed(reason) ⇒ Object



585
586
587
588
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 585

def inform_creation_failed(reason)
  error reason
  inform :creation_failed, { reason: reason }
end

#inform_error(reason) ⇒ Object



580
581
582
583
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 580

def inform_error(reason)
  error reason
  inform :error, { reason: reason }
end

#inform_status(props) ⇒ Object



576
577
578
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 576

def inform_status(props)
  inform :status, props
end

#inform_warn(reason) ⇒ Object



590
591
592
593
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 590

def inform_warn(reason)
  warn reason
  inform :warn, { reason: reason }
end

#process_omf_message(message, topic) ⇒ Object

Parse omf message and execute as instructed by the message

Parameters:

  • message (OmfCommon::Message)

    FRCP message

  • topic (OmfCommon::Comm::Topic)

    subscribed to



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 377

def process_omf_message(message, topic)
  return unless check_guard(message)

  unless message.is_a? OmfCommon::Message
    raise ArgumentError, "Expected OmfCommon::Message, but got '#{message.class}'"
  end

  unless message.valid?
    raise StandardError, "Invalid message received: #{pubsub_item_payload}. Please check protocol schema of version #{OmfCommon::PROTOCOL_VERSION}."
  end

  objects_by_topic(topic.id.to_s).each do |obj|
    if OmfCommon::Measure.enabled?
      OmfRc::ResourceProxy::MPReceived.inject(Time.now.to_f, self.uid, topic, message.mid)
    end
    execute_omf_operation(message, obj, topic)
  end
end

#release(res_id) ⇒ AbstractResource

Release a child resource

Returns:



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 222

def release(res_id)
  if (child = children.find { |v| v.uid.to_s == res_id.to_s })
    if child.release_self()
      self.synchronize do
        children.delete(child)
      end
      child
    else
      child = nil
    end
    debug "#{child.uid} released"
  else
    debug "#{res_id} does not belong to #{self.uid}(#{self.hrn}) - #{children.map(&:uid).inspect}"
  end
  child
end

#release_selfBoolean

Release this resource. Should ONLY be called by parent resource.

Returns:

  • (Boolean)

    true if successful



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 242

def release_self
  # Release children resource recursively
  children.each do |c|
    if c.release_self
      self.synchronize do
        children.delete(c)
      end
    end
  end

  return false unless children.empty?

  info "Releasing hrn: #{hrn}, uid: #{uid}"

  call_hook(:before_release, self)

  props = {
    res_id: resource_address
  }
  props[:hrn] = hrn if hrn
  inform :released, props

  # clean up topics
  @topics.each do |t|
    t.unsubscribe(@uid)
  end

  @membership_topics.each_value do |t|
    if t.respond_to? :delete_on_message_cbk_by_id
      t.delete_on_message_cbk_by_id(@uid)
    end
    t.unsubscribe(@uid)
  end

  true
end

#request_available_properties(*args) ⇒ Hashie::Mash

Return a list of all properties can be requested and configured

Examples:

{ request: [:ip_addr, :frequency], configure: [:ip_address] }

Returns:

  • (Hashie::Mash)


296
297
298
299
300
301
302
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 296

def request_available_properties(*args)
  Hashie::Mash.new(request: [], configure: []).tap do |mash|
    methods.each do |m|
      mash[$1] << $2.to_sym if m =~ /^(request|configure)_(.+)/ && $2 != "available_properties"
    end
  end
end

#request_child_resources(*args) ⇒ Hashie::Mash

Request child resources

Returns:

  • (Hashie::Mash)

    child resource mash with uid and hrn



331
332
333
334
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 331

def request_child_resources(*args)
  #children.map { |c| Hashie::Mash.new({ uid: c.uid, name: c.hrn }) }
  children.map { |c| c.to_hash }
end

#request_hrn(*args) ⇒ Object Also known as: request_name

Make hrn accessible through pubsub interface



315
316
317
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 315

def request_hrn(*args)
  hrn
end

#request_membership(*args) ⇒ Object

Query resource’s membership



324
325
326
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 324

def request_membership(*args)
  @membership
end

#request_supported_children_type(*args) ⇒ Array<Symbol>

Return a list of child resources this resource can create

Returns:

  • (Array<Symbol>)


284
285
286
287
288
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 284

def request_supported_children_type(*args)
  OmfRc::ResourceFactory.proxy_list.reject { |v| v == @type.to_s }.find_all do |k, v|
    (v.create_by && v.create_by.include?(@type.to_sym)) || v.create_by.nil?
  end.map(&:first).map(&:to_sym)
end

#request_type(*args) ⇒ Object

Make type accessible through pubsub interface



310
311
312
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 310

def request_type(*args)
  type
end

#request_uid(*args) ⇒ Object

Make uid accessible through pubsub interface



305
306
307
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 305

def request_uid(*args)
  uid
end

#resource_addressObject

Return the public ‘routable’ address for this resource or nil if not known yet.



181
182
183
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 181

def resource_address
  resource_topic.address
end

#resource_topicObject

Return resource’ pubsub topic it has subscribed.



172
173
174
175
176
177
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 172

def resource_topic
  if @topics.empty?
    raise TopicNotSubscribedError, "Resource '#{@uid}' has not subscribed to any topics"
  end
  @topics[0]
end

#to_hashHash

Return a hash describing a reference to this object

Returns:

  • (Hash)


598
599
600
# File 'lib/omf_rc/resource_proxy/abstract_resource.rb', line 598

def to_hash
  { uid: @uid, address: resource_address() }
end