Class: OmfEc::Experiment
- Inherits:
-
Object
- Object
- OmfEc::Experiment
- Includes:
- MonitorMixin, Singleton
- Defined in:
- lib/omf_ec/experiment.rb
Overview
Experiment class to hold relevant state information
Defined Under Namespace
Classes: MetaData
Instance Attribute Summary collapse
-
#app_definitions ⇒ Object
Returns the value of attribute app_definitions.
-
#assertion ⇒ Object
Returns the value of attribute assertion.
-
#cmdline_properties ⇒ Object
Returns the value of attribute cmdline_properties.
-
#groups ⇒ Object
readonly
Returns the value of attribute groups.
-
#job_mps ⇒ Object
Returns the value of attribute job_mps.
-
#job_url ⇒ Object
Returns the value of attribute job_url.
-
#js_url ⇒ Object
Returns the value of attribute js_url.
-
#name ⇒ Object
Returns the value of attribute name.
-
#nodes ⇒ Object
Returns the value of attribute nodes.
-
#oml_uri ⇒ Object
Returns the value of attribute oml_uri.
-
#property ⇒ Object
Returns the value of attribute property.
-
#show_graph ⇒ Object
Returns the value of attribute show_graph.
-
#sliceID ⇒ Object
Returns the value of attribute sliceID.
-
#ss_url ⇒ Object
Returns the value of attribute ss_url.
-
#sub_groups ⇒ Object
readonly
Returns the value of attribute sub_groups.
Class Method Summary collapse
- .disconnect ⇒ Object
-
.done ⇒ Object
Disconnect communicator, try to delete any XMPP affiliations.
-
.ID ⇒ Object
Unique experiment id (Class method).
-
.leave_memberships ⇒ Object
Ask the resources which joined the groups I created to leave.
-
.sliceID ⇒ Object
Unique slice id (Class method).
- .start ⇒ Object
Instance Method Summary collapse
- #add_event(name, opts, trigger) ⇒ Object
- #add_group(group) ⇒ Object
- #add_or_update_resource_state(name, opts = {}) ⇒ Object (also: #add_resource)
- #add_periodic_event(event) ⇒ Object
- #add_property(name, value = nil, description = nil) ⇒ Object
- #add_sub_group(name) ⇒ Object
- #all_groups?(&block) ⇒ Boolean
-
#archive_oedl(script_name) ⇒ Object
Archive OEDL content to OML db.
- #clear_events ⇒ Object
-
#create_job ⇒ Object
If EC is launched with –job-service setup, then it needs to create a job entry for this experiment trial Do nothing if: - a JobService URL has not been provided, i.e.
- #each_group(&block) ⇒ Object
- #eval_trigger(event) ⇒ Object
- #event(name) ⇒ Object
- #group(name) ⇒ Object
-
#groups_by_res(res_addr) ⇒ Object
Find all groups a given resource belongs to.
-
#id ⇒ Object
Unique experiment id.
-
#initialize ⇒ Experiment
constructor
A new instance of Experiment.
- #log_metadata(key, value, domain = 'sys') ⇒ Object
- #mp_table_names ⇒ Object
-
#process_events ⇒ Object
Parsing user defined events, checking conditions against internal state, and execute callbacks if triggered.
- #resource_by_hrn(hrn) ⇒ Object
- #resource_state(address) ⇒ Object (also: #resource)
- #state ⇒ Object
- #sub_group(name) ⇒ Object
Constructor Details
#initialize ⇒ Experiment
Returns a new instance of Experiment.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/omf_ec/experiment.rb', line 32 def initialize super @id = Time.now.utc.iso8601(3) @sliceID = nil @state ||= Hashie::Mash.new #TODO: we need to keep history of all the events and not ovewrite them @groups ||= [] @nodes ||= [] @events ||= [] @app_definitions ||= Hash.new @sub_groups ||= [] @cmdline_properties ||= Hash.new @show_graph = false @js_url = nil @job_url = nil @job_mps = {} @ss_url = nil end |
Instance Attribute Details
#app_definitions ⇒ Object
Returns the value of attribute app_definitions.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def app_definitions @app_definitions end |
#assertion ⇒ Object
Returns the value of attribute assertion.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def assertion @assertion end |
#cmdline_properties ⇒ Object
Returns the value of attribute cmdline_properties.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def cmdline_properties @cmdline_properties end |
#groups ⇒ Object (readonly)
Returns the value of attribute groups.
20 21 22 |
# File 'lib/omf_ec/experiment.rb', line 20 def groups @groups end |
#job_mps ⇒ Object
Returns the value of attribute job_mps.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def job_mps @job_mps end |
#job_url ⇒ Object
Returns the value of attribute job_url.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def job_url @job_url end |
#js_url ⇒ Object
Returns the value of attribute js_url.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def js_url @js_url end |
#name ⇒ Object
Returns the value of attribute name.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def name @name end |
#nodes ⇒ Object
Returns the value of attribute nodes.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def nodes @nodes end |
#oml_uri ⇒ Object
Returns the value of attribute oml_uri.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def oml_uri @oml_uri end |
#property ⇒ Object
Returns the value of attribute property.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def property @property end |
#show_graph ⇒ Object
Returns the value of attribute show_graph.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def show_graph @show_graph end |
#sliceID ⇒ Object
Returns the value of attribute sliceID.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def sliceID @sliceID end |
#ss_url ⇒ Object
Returns the value of attribute ss_url.
19 20 21 |
# File 'lib/omf_ec/experiment.rb', line 19 def ss_url @ss_url end |
#sub_groups ⇒ Object (readonly)
Returns the value of attribute sub_groups.
20 21 22 |
# File 'lib/omf_ec/experiment.rb', line 20 def sub_groups @sub_groups end |
Class Method Details
.disconnect ⇒ Object
312 313 314 315 316 317 318 319 |
# File 'lib/omf_ec/experiment.rb', line 312 def disconnect info "Disconnecting in 5 sec from experiment: #{OmfEc.experiment.id}" info "Run the EC again to reattach" OmfCommon.el.after(5) do OmfCommon.comm.disconnect OmfCommon.eventloop.stop end end |
.done ⇒ Object
Disconnect communicator, try to delete any XMPP affiliations
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/omf_ec/experiment.rb', line 272 def done info "Experiment: #{OmfEc.experiment.id} finished" info "Exit in 15 seconds..." # Make sure that all defined events are removed OmfEc.experiment.clear_events OmfCommon.el.after(10) do allGroups do |g| # Clean up unless g.app_contexts.empty? info "Release applications in #{g.name}" g.resources[type: 'application'].release end unless g.net_ifs.find_all { |v| v.conf[:type] == 'net' }.empty? info "Release wired network interfaces in #{g.name}" g.resources[type: 'net'].release end unless g.net_ifs.find_all { |v| v.conf[:type] == 'wlan' }.empty? info "Release wireless network interfaces in #{g.name}" g.resources[type: 'wlan'].release end # Let release messages go through first OmfCommon.el.after(1) do info "Configure resources to leave #{g.name}" g.resources.membership = { leave: g.address } end end OmfCommon.el.after(4) do info "OMF Experiment Controller #{OmfEc::VERSION} - Exit." OmfCommon.el.after(1) do OmfCommon.comm.disconnect OmfCommon.eventloop.stop end end end OmfEc.experiment.("state", "finished") end |
.ID ⇒ Object
Unique experiment id (Class method)
181 182 183 |
# File 'lib/omf_ec/experiment.rb', line 181 def self.ID instance.id end |
.leave_memberships ⇒ Object
Ask the resources which joined the groups I created to leave
355 356 357 358 359 |
# File 'lib/omf_ec/experiment.rb', line 355 def leave_memberships all_groups do |g| g.resources.membership = { leave: g.address } end end |
.sliceID ⇒ Object
Unique slice id (Class method)
186 187 188 |
# File 'lib/omf_ec/experiment.rb', line 186 def self.sliceID instance.sliceID end |
.start ⇒ Object
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/omf_ec/experiment.rb', line 321 def start info "Experiment: #{OmfEc.experiment.id} starts" info "Slice: #{OmfEc.experiment.sliceID}" unless OmfEc.experiment.sliceID.nil? OmfEc.experiment.("state", "running") allGroups do |g| info "CONFIGURE #{g.members.size} resources to join group #{g.name}" debug "CONFIGURE #{g.members.keys} to join group #{g.name}" g.members.each do |key, value| OmfEc.subscribe_and_monitor(key) do |res| #info "Configure '#{key}' to join '#{g.name}'" g.synchronize do g.members[key] = res.address end res.configure({ membership: g.address, res_index: OmfEc.experiment.nodes.index(key) }, { assert: OmfEc.experiment.assertion }) end end end # For every 100 nodes, increase check interval by 1 second count = allGroups.inject(0) { |c, g| c += g.members.size } interval = count / 100 interval = 1 if interval < 1 info "TOTAL resources: #{count}. Events check interval: #{interval}." OmfCommon.el.every(interval) do EM.next_tick do OmfEc.experiment.process_events rescue nil end end end |
Instance Method Details
#add_event(name, opts, trigger) ⇒ Object
157 158 159 160 161 162 163 164 |
# File 'lib/omf_ec/experiment.rb', line 157 def add_event(name, opts, trigger) self.synchronize do warn "Event '#{name}' has already been defined. Overwriting it now." if event(name) @events.delete_if { |e| e[:name] == name } @events << { name: name, trigger: trigger, aliases: [] }.merge(opts) add_periodic_event(event(name)) if opts[:every] end end |
#add_group(group) ⇒ Object
134 135 136 137 138 139 |
# File 'lib/omf_ec/experiment.rb', line 134 def add_group(group) self.synchronize do raise ArgumentError, "Expect Group object, got #{group.inspect}" unless group.kind_of? OmfEc::Group @groups << group unless group(group.name) end end |
#add_or_update_resource_state(name, opts = {}) ⇒ Object Also known as: add_resource
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/omf_ec/experiment.rb', line 74 def add_or_update_resource_state(name, opts = {}) self.synchronize do res = resource_state(name) if res opts.each do |key, value| if value.class == Array # Merge array values res[key] ||= [] res[key] += value res[key].uniq! elsif value.kind_of? Hash # Merge hash values res[key] ||= {} res[key].merge!(value) else # Overwrite otherwise res[key] = value end end else debug "Newly discovered resource >> #{name}" #res = Hashie::Mash.new({ address: name }).merge(opts) opts[:address] = name @state[name] = opts # Re send membership configure #planned_groups = groups_by_res(name) #unless planned_groups.empty? # OmfEc.subscribe_and_monitor(name) do |res| # info "Config #{name} to join #{planned_groups.map(&:name).join(', ')}" # res.configure({ membership: planned_groups.map(&:address) }, { assert: OmfEc.experiment.assertion } ) # end #end end end end |
#add_periodic_event(event) ⇒ Object
199 200 201 202 203 204 205 |
# File 'lib/omf_ec/experiment.rb', line 199 def add_periodic_event(event) event[:periodic_timer] = OmfCommon.el.every(event[:every]) do self.synchronize do eval_trigger(event) end end end |
#add_property(name, value = nil, description = nil) ⇒ Object
58 59 60 61 62 |
# File 'lib/omf_ec/experiment.rb', line 58 def add_property(name, value = nil, description = nil) override_value = @cmdline_properties[name.to_s.to_sym] value = override_value unless override_value.nil? ExperimentProperty.create(name, value, description) end |
#add_sub_group(name) ⇒ Object
124 125 126 127 128 |
# File 'lib/omf_ec/experiment.rb', line 124 def add_sub_group(name) self.synchronize do @sub_groups << name unless @sub_groups.include?(name) end end |
#all_groups?(&block) ⇒ Boolean
149 150 151 |
# File 'lib/omf_ec/experiment.rb', line 149 def all_groups?(&block) !groups.empty? && groups.all? { |g| block ? block.call(g) : g } end |
#archive_oedl(script_name) ⇒ Object
Archive OEDL content to OML db
237 238 239 240 241 242 243 |
# File 'lib/omf_ec/experiment.rb', line 237 def archive_oedl(script_name) ( script_name, Base64.encode64(Zlib::Deflate.deflate(File.read(script_name))), "oedl_content" ) end |
#clear_events ⇒ Object
166 167 168 169 170 171 172 173 |
# File 'lib/omf_ec/experiment.rb', line 166 def clear_events self.synchronize do @events.each do |e| e[:periodic_timer].cancel if e[:periodic_timer] end @events = [] end end |
#create_job ⇒ Object
If EC is launched with –job-service setup, then it needs to create a job entry for this experiment trial Do nothing if:
-
a JobService URL has not been provided, i.e. EC runs without needs to contact JS
-
we already have a Job URL, i.e. the job entry has already been created
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/omf_ec/experiment.rb', line 250 def create_job return unless @job_url.nil? return if @js_url.nil? require 'json' require 'net/http' begin job = { name: self.id } u = URI.parse(@js_url+'/jobs') req = Net::HTTP::Post.new(u.path, {'Content-Type' =>'application/json'}) req.body = JSON.pretty_generate(job) res = Net::HTTP.new(u.host, u.port).start {|http| http.request(req) } raise "Could not create a job for this experiment trial\n"+ "Response #{res.code} #{res.}:\n#{res.body}" unless res.kind_of? Net::HTTPSuccess job = JSON.parse(res.body) raise "No valid URL received for the created job for this experiment trial" if job['href'].nil? @job_url = job['href'] end end |
#each_group(&block) ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/omf_ec/experiment.rb', line 141 def each_group(&block) if block groups.each { |g| block.call(g) } else groups end end |
#eval_trigger(event) ⇒ Object
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/omf_ec/experiment.rb', line 207 def eval_trigger(event) if event[:callbacks] && !event[:callbacks].empty? && event[:trigger].call(state) # Periodic check event event[:periodic_timer].cancel if event[:periodic_timer] && event[:consume_event] @events.delete(event) if event[:consume_event] event_names = ([event[:name]] + event[:aliases]).join(', ') info "Event triggered: '#{event_names}'" # Last in first serve callbacks event[:callbacks].reverse.each do |callback| callback.call end end end |
#event(name) ⇒ Object
153 154 155 |
# File 'lib/omf_ec/experiment.rb', line 153 def event(name) @events.find { |v| v[:name] == name || v[:aliases].include?(name) } end |
#group(name) ⇒ Object
130 131 132 |
# File 'lib/omf_ec/experiment.rb', line 130 def group(name) groups.find { |v| v.name == name } end |
#groups_by_res(res_addr) ⇒ Object
Find all groups a given resource belongs to
116 117 118 |
# File 'lib/omf_ec/experiment.rb', line 116 def groups_by_res(res_addr) groups.find_all { |g| g.members.values.include?(res_addr) } end |
#id ⇒ Object
Unique experiment id
176 177 178 |
# File 'lib/omf_ec/experiment.rb', line 176 def id @name || @id end |
#log_metadata(key, value, domain = 'sys') ⇒ Object
231 232 233 234 |
# File 'lib/omf_ec/experiment.rb', line 231 def (key, value, domain = 'sys') #MetaData.inject_metadata(key.to_s, value.to_s) MetaData.inject(domain.to_s, key.to_s, value.to_s) end |
#mp_table_names ⇒ Object
223 224 225 226 227 228 229 |
# File 'lib/omf_ec/experiment.rb', line 223 def mp_table_names {}.tap do |m_t_n| groups.map(&:app_contexts).flatten.map(&:mp_table_names).each do |v| m_t_n.merge!(v) end end end |
#process_events ⇒ Object
Parsing user defined events, checking conditions against internal state, and execute callbacks if triggered
191 192 193 194 195 196 197 |
# File 'lib/omf_ec/experiment.rb', line 191 def process_events self.synchronize do @events.find_all { |v| v[:every].nil? }.each do |event| eval_trigger(event) end end end |
#resource_by_hrn(hrn) ⇒ Object
70 71 72 |
# File 'lib/omf_ec/experiment.rb', line 70 def resource_by_hrn(hrn) @state[hrn] end |
#resource_state(address) ⇒ Object Also known as: resource
64 65 66 |
# File 'lib/omf_ec/experiment.rb', line 64 def resource_state(address) @state[address] end |
#state ⇒ Object
50 51 52 |
# File 'lib/omf_ec/experiment.rb', line 50 def state @state.values end |
#sub_group(name) ⇒ Object
120 121 122 |
# File 'lib/omf_ec/experiment.rb', line 120 def sub_group(name) @sub_groups.find { |v| v == name } end |