Class: OmfEc::Experiment

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, Singleton
Defined in:
lib/omf_ec/experiment.rb

Overview

Experiment class to hold relevant state information

Defined Under Namespace

Classes: MetaData

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeExperiment

Returns a new instance of Experiment.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/omf_ec/experiment.rb', line 32

def initialize
  super
  @id = Time.now.utc.iso8601(3)
  @sliceID = nil
  @state ||= Hashie::Mash.new #TODO: we need to keep history of all the events and not ovewrite them
  @groups ||= []
  @nodes ||= []
  @events ||= []
  @app_definitions ||= Hash.new
  @sub_groups ||= []
  @cmdline_properties ||= Hash.new
  @show_graph = false
  @js_url = nil
  @job_url = nil
  @job_mps = {}
  @ss_url = nil
end

Instance Attribute Details

#app_definitionsObject

Returns the value of attribute app_definitions.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def app_definitions
  @app_definitions
end

#assertionObject

Returns the value of attribute assertion.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def assertion
  @assertion
end

#cmdline_propertiesObject

Returns the value of attribute cmdline_properties.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def cmdline_properties
  @cmdline_properties
end

#groupsObject (readonly)

Returns the value of attribute groups.



20
21
22
# File 'lib/omf_ec/experiment.rb', line 20

def groups
  @groups
end

#job_mpsObject

Returns the value of attribute job_mps.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def job_mps
  @job_mps
end

#job_urlObject

Returns the value of attribute job_url.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def job_url
  @job_url
end

#js_urlObject

Returns the value of attribute js_url.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def js_url
  @js_url
end

#nameObject

Returns the value of attribute name.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def name
  @name
end

#nodesObject

Returns the value of attribute nodes.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def nodes
  @nodes
end

#oml_uriObject

Returns the value of attribute oml_uri.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def oml_uri
  @oml_uri
end

#propertyObject

Returns the value of attribute property.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def property
  @property
end

#show_graphObject

Returns the value of attribute show_graph.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def show_graph
  @show_graph
end

#sliceIDObject

Returns the value of attribute sliceID.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def sliceID
  @sliceID
end

#ss_urlObject

Returns the value of attribute ss_url.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def ss_url
  @ss_url
end

#sub_groupsObject (readonly)

Returns the value of attribute sub_groups.



20
21
22
# File 'lib/omf_ec/experiment.rb', line 20

def sub_groups
  @sub_groups
end

Class Method Details

.disconnectObject



312
313
314
315
316
317
318
319
# File 'lib/omf_ec/experiment.rb', line 312

def disconnect
  info "Disconnecting in 5 sec from experiment: #{OmfEc.experiment.id}"
  info "Run the EC again to reattach"
  OmfCommon.el.after(5) do
    OmfCommon.comm.disconnect
    OmfCommon.eventloop.stop
  end
end

.doneObject

Disconnect communicator, try to delete any XMPP affiliations



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/omf_ec/experiment.rb', line 272

def done
  info "Experiment: #{OmfEc.experiment.id} finished"
  info "Exit in 15 seconds..."

  # Make sure that all defined events are removed
  OmfEc.experiment.clear_events

  OmfCommon.el.after(10) do
    allGroups do |g|
      # Clean up
      unless g.app_contexts.empty?
        info "Release applications in #{g.name}"
        g.resources[type: 'application'].release
      end
      unless g.net_ifs.find_all { |v| v.conf[:type] == 'net' }.empty?
        info "Release wired network interfaces in #{g.name}"
        g.resources[type: 'net'].release
      end
      unless g.net_ifs.find_all { |v| v.conf[:type] == 'wlan' }.empty?
        info "Release wireless network interfaces in #{g.name}"
        g.resources[type: 'wlan'].release
      end
      # Let release messages go through first
      OmfCommon.el.after(1) do
        info "Configure resources to leave #{g.name}"
        g.resources.membership = { leave: g.address }
      end
    end

    OmfCommon.el.after(4) do
      info "OMF Experiment Controller #{OmfEc::VERSION} - Exit."
      OmfCommon.el.after(1) do
        OmfCommon.comm.disconnect
        OmfCommon.eventloop.stop
      end
    end
  end
  OmfEc.experiment.("state", "finished")
end

.IDObject

Unique experiment id (Class method)



181
182
183
# File 'lib/omf_ec/experiment.rb', line 181

def self.ID
  instance.id
end

.leave_membershipsObject

Ask the resources which joined the groups I created to leave



355
356
357
358
359
# File 'lib/omf_ec/experiment.rb', line 355

def leave_memberships
  all_groups do |g|
    g.resources.membership = { leave: g.address }
  end
end

.sliceIDObject

Unique slice id (Class method)



186
187
188
# File 'lib/omf_ec/experiment.rb', line 186

def self.sliceID
  instance.sliceID
end

.startObject



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/omf_ec/experiment.rb', line 321

def start
  info "Experiment: #{OmfEc.experiment.id} starts"
  info "Slice: #{OmfEc.experiment.sliceID}" unless OmfEc.experiment.sliceID.nil?
  OmfEc.experiment.("state", "running")

  allGroups do |g|
    info "CONFIGURE #{g.members.size} resources to join group #{g.name}"
    debug "CONFIGURE #{g.members.keys} to join group #{g.name}"
    g.members.each do |key, value|
      OmfEc.subscribe_and_monitor(key) do |res|
        #info "Configure '#{key}' to join '#{g.name}'"
        g.synchronize do
          g.members[key] = res.address
        end
        res.configure({ membership: g.address, res_index: OmfEc.experiment.nodes.index(key) }, { assert: OmfEc.experiment.assertion })
      end
    end
  end

  # For every 100 nodes, increase check interval by 1 second
  count = allGroups.inject(0) { |c, g| c += g.members.size }
  interval = count / 100
  interval = 1 if interval < 1
  info "TOTAL resources: #{count}. Events check interval: #{interval}."

  OmfCommon.el.every(interval) do
    EM.next_tick do
      OmfEc.experiment.process_events rescue nil
    end
  end
end

Instance Method Details

#add_event(name, opts, trigger) ⇒ Object



157
158
159
160
161
162
163
164
# File 'lib/omf_ec/experiment.rb', line 157

def add_event(name, opts, trigger)
  self.synchronize do
    warn "Event '#{name}' has already been defined. Overwriting it now." if event(name)
    @events.delete_if { |e| e[:name] == name }
    @events << { name: name, trigger: trigger, aliases: [] }.merge(opts)
    add_periodic_event(event(name)) if opts[:every]
  end
end

#add_group(group) ⇒ Object



134
135
136
137
138
139
# File 'lib/omf_ec/experiment.rb', line 134

def add_group(group)
  self.synchronize do
    raise ArgumentError, "Expect Group object, got #{group.inspect}" unless group.kind_of? OmfEc::Group
    @groups << group unless group(group.name)
  end
end

#add_or_update_resource_state(name, opts = {}) ⇒ Object Also known as: add_resource



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/omf_ec/experiment.rb', line 74

def add_or_update_resource_state(name, opts = {})
  self.synchronize do
    res = resource_state(name)
    if res
      opts.each do |key, value|
        if value.class == Array
          # Merge array values
          res[key] ||= []
          res[key] += value
          res[key].uniq!
        elsif value.kind_of? Hash
          # Merge hash values
          res[key] ||= {}
          res[key].merge!(value)
        else
          # Overwrite otherwise
          res[key] = value
        end
      end
    else
      debug "Newly discovered resource >> #{name}"
      #res = Hashie::Mash.new({ address: name }).merge(opts)
      opts[:address] = name
      @state[name] = opts

      # Re send membership configure
      #planned_groups = groups_by_res(name)

      #unless planned_groups.empty?
      #  OmfEc.subscribe_and_monitor(name) do |res|
      #    info "Config #{name} to join #{planned_groups.map(&:name).join(', ')}"
      #    res.configure({ membership: planned_groups.map(&:address) }, { assert: OmfEc.experiment.assertion } )
      #  end
      #end
    end
  end
end

#add_periodic_event(event) ⇒ Object



199
200
201
202
203
204
205
# File 'lib/omf_ec/experiment.rb', line 199

def add_periodic_event(event)
  event[:periodic_timer] = OmfCommon.el.every(event[:every]) do
    self.synchronize do
      eval_trigger(event)
    end
  end
end

#add_property(name, value = nil, description = nil) ⇒ Object



58
59
60
61
62
# File 'lib/omf_ec/experiment.rb', line 58

def add_property(name, value = nil, description = nil)
  override_value = @cmdline_properties[name.to_s.to_sym]
  value = override_value unless override_value.nil?
  ExperimentProperty.create(name, value, description)
end

#add_sub_group(name) ⇒ Object



124
125
126
127
128
# File 'lib/omf_ec/experiment.rb', line 124

def add_sub_group(name)
  self.synchronize do
    @sub_groups << name unless @sub_groups.include?(name)
  end
end

#all_groups?(&block) ⇒ Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/omf_ec/experiment.rb', line 149

def all_groups?(&block)
  !groups.empty? && groups.all? { |g| block ? block.call(g) : g }
end

#archive_oedl(script_name) ⇒ Object

Archive OEDL content to OML db



237
238
239
240
241
242
243
# File 'lib/omf_ec/experiment.rb', line 237

def archive_oedl(script_name)
  (
    script_name,
    Base64.encode64(Zlib::Deflate.deflate(File.read(script_name))),
    "oedl_content"
  )
end

#clear_eventsObject



166
167
168
169
170
171
172
173
# File 'lib/omf_ec/experiment.rb', line 166

def clear_events
  self.synchronize do
    @events.each do |e|
      e[:periodic_timer].cancel if e[:periodic_timer]
    end
    @events = []
  end
end

#create_jobObject

If EC is launched with –job-service setup, then it needs to create a job entry for this experiment trial Do nothing if:

  • a JobService URL has not been provided, i.e. EC runs without needs to contact JS

  • we already have a Job URL, i.e. the job entry has already been created



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/omf_ec/experiment.rb', line 250

def create_job
  return unless @job_url.nil?
  return if @js_url.nil?
  require 'json'
  require 'net/http'
  begin
    job = { name: self.id }
    u = URI.parse(@js_url+'/jobs')
    req = Net::HTTP::Post.new(u.path, {'Content-Type' =>'application/json'})
    req.body = JSON.pretty_generate(job)
    res = Net::HTTP.new(u.host, u.port).start {|http| http.request(req) }
    raise "Could not create a job for this experiment trial\n"+
          "Response #{res.code} #{res.message}:\n#{res.body}" unless res.kind_of? Net::HTTPSuccess
    job = JSON.parse(res.body)
    raise "No valid URL received for the created job for this experiment trial" if job['href'].nil?
    @job_url = job['href']
  end
end

#each_group(&block) ⇒ Object



141
142
143
144
145
146
147
# File 'lib/omf_ec/experiment.rb', line 141

def each_group(&block)
  if block
    groups.each { |g| block.call(g) }
  else
    groups
  end
end

#eval_trigger(event) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/omf_ec/experiment.rb', line 207

def eval_trigger(event)
  if event[:callbacks] && !event[:callbacks].empty? && event[:trigger].call(state)
    # Periodic check event
    event[:periodic_timer].cancel if event[:periodic_timer] && event[:consume_event]

    @events.delete(event) if event[:consume_event]
    event_names = ([event[:name]] + event[:aliases]).join(', ')
    info "Event triggered: '#{event_names}'"

    # Last in first serve callbacks
    event[:callbacks].reverse.each do |callback|
      callback.call
    end
  end
end

#event(name) ⇒ Object



153
154
155
# File 'lib/omf_ec/experiment.rb', line 153

def event(name)
  @events.find { |v| v[:name] == name || v[:aliases].include?(name) }
end

#group(name) ⇒ Object



130
131
132
# File 'lib/omf_ec/experiment.rb', line 130

def group(name)
  groups.find { |v| v.name == name }
end

#groups_by_res(res_addr) ⇒ Object

Find all groups a given resource belongs to



116
117
118
# File 'lib/omf_ec/experiment.rb', line 116

def groups_by_res(res_addr)
  groups.find_all { |g| g.members.values.include?(res_addr) }
end

#idObject

Unique experiment id



176
177
178
# File 'lib/omf_ec/experiment.rb', line 176

def id
  @name || @id
end

#log_metadata(key, value, domain = 'sys') ⇒ Object



231
232
233
234
# File 'lib/omf_ec/experiment.rb', line 231

def (key, value, domain = 'sys')
  #MetaData.inject_metadata(key.to_s, value.to_s)
  MetaData.inject(domain.to_s, key.to_s, value.to_s)
end

#mp_table_namesObject



223
224
225
226
227
228
229
# File 'lib/omf_ec/experiment.rb', line 223

def mp_table_names
  {}.tap do |m_t_n|
    groups.map(&:app_contexts).flatten.map(&:mp_table_names).each do |v|
      m_t_n.merge!(v)
    end
  end
end

#process_eventsObject

Parsing user defined events, checking conditions against internal state, and execute callbacks if triggered



191
192
193
194
195
196
197
# File 'lib/omf_ec/experiment.rb', line 191

def process_events
  self.synchronize do
    @events.find_all { |v| v[:every].nil? }.each do |event|
      eval_trigger(event)
    end
  end
end

#resource_by_hrn(hrn) ⇒ Object



70
71
72
# File 'lib/omf_ec/experiment.rb', line 70

def resource_by_hrn(hrn)
  @state[hrn]
end

#resource_state(address) ⇒ Object Also known as: resource



64
65
66
# File 'lib/omf_ec/experiment.rb', line 64

def resource_state(address)
  @state[address]
end

#stateObject



50
51
52
# File 'lib/omf_ec/experiment.rb', line 50

def state
  @state.values
end

#sub_group(name) ⇒ Object



120
121
122
# File 'lib/omf_ec/experiment.rb', line 120

def sub_group(name)
  @sub_groups.find { |v| v == name }
end