Class: OneWorker
Overview
Sidekiq worker class
Constant Summary collapse
- B_IN_GB =
1_073_741_824- STRING =
/[[:print:]]+/- NUMBER =
/[[:digit:]]+/- NON_ZERO =
/[1-9][[:digit:]]*/- STATES =
%w(started started suspended started suspended suspended completed completed suspended)
Instance Method Summary collapse
-
#common_data ⇒ Object
Prepare data that are common for every virtual machine.
-
#create_image_map(oda) ⇒ Hash
Create mapping of image ID and specified element.
-
#create_map(pool_type, mapping, oda) ⇒ Object
Generic method for mapping creation.
-
#create_user_map(oda) ⇒ Hash
Create mapping of user ID and specified element.
-
#load_vm(vm_id, oda) ⇒ OpenNebula::VirtualMachine
Load virtual machine with specified ID.
-
#mixin(vm) ⇒ NilClass, String
Look for 'os_tpl' OCCI mixin to better identifie virtual machine's image.
- #parse(value, regex, substitute = 'NULL') ⇒ Object
-
#perform(vms, output) ⇒ Object
Sidekiq specific method, specifies the purpose of the worker.
-
#process_vm(vm, user_map, image_map) ⇒ Hash
Obtain and parse required data from vm.
-
#sum_rstime(vm) ⇒ Integer
Sums RSTIME (time when virtual machine was actually running).
-
#write_data(data, output) ⇒ Object
Write processed data into output directory.
Instance Method Details
#common_data ⇒ Object
Prepare data that are common for every virtual machine
26 27 28 29 30 31 32 33 |
# File 'lib/one_worker.rb', line 26 def common_data common_data = {} common_data['endpoint'] = Settings['endpoint'] common_data['site_name'] = Settings['site_name'] common_data['cloud_type'] = Settings['cloud_type'] common_data end |
#create_image_map(oda) ⇒ Hash
Create mapping of image ID and specified element
46 47 48 49 |
# File 'lib/one_worker.rb', line 46 def create_image_map(oda) logger.debug('Creating image map.') create_map(OpenNebula::ImagePool, 'TEMPLATE/VMCATCHER_EVENT_AD_MPURI', oda) end |
#create_map(pool_type, mapping, oda) ⇒ Object
Generic method for mapping creation
52 53 54 55 56 57 58 59 |
# File 'lib/one_worker.rb', line 52 def create_map(pool_type, mapping, oda) oda.mapping(pool_type, mapping) rescue => e msg = "Couldn't create map: #{e.message}. "\ 'Stopping to avoid malformed records.' logger.error(msg) raise msg end |
#create_user_map(oda) ⇒ Hash
Create mapping of user ID and specified element
38 39 40 41 |
# File 'lib/one_worker.rb', line 38 def create_user_map(oda) logger.debug('Creating user map.') create_map(OpenNebula::UserPool, 'TEMPLATE/X509_DN', oda) end |
#load_vm(vm_id, oda) ⇒ OpenNebula::VirtualMachine
Load virtual machine with specified ID
64 65 66 67 68 69 |
# File 'lib/one_worker.rb', line 64 def load_vm(vm_id, oda) oda.vm(vm_id) rescue => e logger.error("Couldn't retrieve data for vm with id: #{vm_id}. #{e.message}. Skipping.") return nil end |
#mixin(vm) ⇒ NilClass, String
Look for 'os_tpl' OCCI mixin to better identifie virtual machine's image
149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/one_worker.rb', line 149 def mixin(vm) mixin_locations = %w(USER_TEMPLATE/OCCI_COMPUTE_MIXINS USER_TEMPLATE/OCCI_MIXIN TEMPLATE/OCCI_MIXIN) mixin_locations.each do |mixin_location| vm.each mixin_location do |mixin| mixin = mixin.text.split mixin.select! { |line| line.include? '/occi/infrastructure/os_tpl#' } return mixin.first unless mixin.empty? end end nil # nothing found end |
#parse(value, regex, substitute = 'NULL') ⇒ Object
225 226 227 |
# File 'lib/one_worker.rb', line 225 def parse(value, regex, substitute = 'NULL') regex =~ value ? value : substitute end |
#perform(vms, output) ⇒ Object
Sidekiq specific method, specifies the purpose of the worker
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/one_worker.rb', line 188 def perform(vms, output) OneacctExporter::Log.setup_log_level(logger) vms = vms.split('|') oda = OneDataAccessor.new(logger) user_map = create_user_map(oda) image_map = create_image_map(oda) data = [] vms.each do |vm_id| vm = load_vm(vm_id, oda) next unless vm logger.debug("Processing vm with id: #{vm_id}.") vm_data = process_vm(vm, user_map, image_map) next unless vm_data logger.debug("Adding vm with data: #{vm_data} for export.") data << vm_data end write_data(data, output) end |
#process_vm(vm, user_map, image_map) ⇒ Hash
Obtain and parse required data from vm
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/one_worker.rb', line 74 def process_vm(vm, user_map, image_map) data = common_data.clone data['vm_uuid'] = parse(vm['ID'], STRING) unless vm['STIME'] logger.error('Skipping a malformed record. '\ "VM with id #{data['vm_uuid']} has no StartTime.") return nil end data['start_time'] = Time.at(parse(vm['STIME'], NUMBER).to_i) start_time = data['start_time'].to_i if start_time == 0 logger.error('Skipping a malformed record. '\ "VM with id #{data['vm_uuid']} has malformed StartTime.") return nil end data['end_time'] = parse(vm['ETIME'], NON_ZERO) end_time = data['end_time'].to_i data['end_time'] = Time.at(end_time) if end_time != 0 if end_time != 0 && start_time > end_time logger.error('Skipping malformed record. '\ "VM with id #{data['vm_uuid']} has wrong time entries.") return nil end data['machine_name'] = parse(vm['DEPLOY_ID'], STRING, "one-#{data['vm_uuid']}") data['user_id'] = parse(vm['UID'], STRING) data['group_id'] = parse(vm['GID'], STRING) data['user_name'] = parse(vm['USER_TEMPLATE/USER_X509_DN'], STRING, nil) data['user_name'] = parse(user_map[data['user_id']], STRING) unless data['user_name'] data['fqan'] = parse(vm['GNAME'], STRING, nil) if vm['STATE'] data['status'] = parse(STATES[vm['STATE'].to_i], STRING) else data['status'] = 'NULL' end unless vm['HISTORY_RECORDS/HISTORY[1]'] logger.warn('Skipping malformed record. '\ "VM with id #{data['vm_uuid']} has no history records.") return nil end rstime = sum_rstime(vm) return nil unless rstime data['duration'] = parse(rstime.to_s, NON_ZERO) suspend = (end_time - start_time) - data['duration'].to_i unless end_time == 0 data['suspend'] = parse(suspend.to_s, NUMBER) vcpu = vm['TEMPLATE/VCPU'] data['cpu_count'] = parse(vcpu, NON_ZERO, '1') net_tx = parse(vm['NET_TX'], NUMBER, 0) data['network_inbound'] = (net_tx.to_i / B_IN_GB).round net_rx = parse(vm['NET_RX'], NUMBER, 0) data['network_outbound'] = (net_rx.to_i / B_IN_GB).round data['memory'] = parse(vm['MEMORY'], NUMBER, '0') data['image_name'] = parse(image_map[vm['TEMPLATE/DISK[1]/IMAGE_ID']], STRING, nil) data['image_name'] = parse(mixin(vm), STRING) unless data['image_name'] data end |
#sum_rstime(vm) ⇒ Integer
Sums RSTIME (time when virtual machine was actually running)
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/one_worker.rb', line 168 def sum_rstime(vm) rstime = 0 vm.each 'HISTORY_RECORDS/HISTORY' do |h| next unless h['RSTIME'] && h['RETIME'] && h['RETIME'] != '0' && h['RSTIME'] != '0' if h['RSTIME'].to_i > h['RETIME'].to_i logger.warn('Skipping malformed record. '\ "VM with id #{vm['ID']} has wrong CpuDuration.") rstime = nil break end rstime += h['RETIME'].to_i - h['RSTIME'].to_i end rstime end |
#write_data(data, output) ⇒ Object
Write processed data into output directory
215 216 217 218 219 220 221 222 223 |
# File 'lib/one_worker.rb', line 215 def write_data(data, output) logger.debug('Creating writer...') ow = OneWriter.new(data, output, logger) ow.write rescue => e msg = "Cannot write result to #{output}: #{e.message}" logger.error(msg) raise msg end |