Class: Kriterion::Worker
- Inherits:
-
Object
- Object
- Kriterion::Worker
- Includes:
- Logs
- Defined in:
- lib/kriterion/worker.rb
Constant Summary
Constants included from Logs
Instance Attribute Summary collapse
-
#backend ⇒ Object
readonly
Returns the value of attribute backend.
-
#metrics ⇒ Object
readonly
Returns the value of attribute metrics.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queue_uri ⇒ Object
readonly
Returns the value of attribute queue_uri.
-
#standards ⇒ Object
readonly
Returns the value of attribute standards.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Worker
constructor
A new instance of Worker.
- #process_report(report) ⇒ Object
- #run ⇒ Object
Methods included from Logs
Constructor Details
#initialize(opts = {}) ⇒ Worker
Returns a new instance of Worker.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/kriterion/worker.rb', line 26 def initialize(opts = {}) logger.level = if opts[:debug] Kriterion::Logs::DEBUG else Kriterion::Logs::INFO end # Set up connections @uri = opts[:uri] @queue = opts[:queue] @queue_uri = URI("#{@uri}/q/#{@queue}") @metrics = Kriterion::Metrics.new # Set up the backend # TODO: Clean this up and make fully dynamic backend_name = opts[:backend] || 'mongodb' case backend_name when 'mongodb' require 'kriterion/backend/mongodb' Kriterion::Backend.set( Kriterion::Backend::MongoDB.new( hostname: opts[:mongo_hostname], port: opts[:mongo_port], database: opts[:mongo_database], metrics: metrics ) ) end @backend = Kriterion::Backend.get # TODO: Work out how workers are going to get the list of standards frmo the API runner # TODO: Remove placeholder code standards_dir = File.('standards', Kriterion::ROOT) @standards = Kriterion.standards([standards_dir]) end |
Instance Attribute Details
#backend ⇒ Object (readonly)
Returns the value of attribute backend.
23 24 25 |
# File 'lib/kriterion/worker.rb', line 23 def backend @backend end |
#metrics ⇒ Object (readonly)
Returns the value of attribute metrics.
24 25 26 |
# File 'lib/kriterion/worker.rb', line 24 def metrics @metrics end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
20 21 22 |
# File 'lib/kriterion/worker.rb', line 20 def queue @queue end |
#queue_uri ⇒ Object (readonly)
Returns the value of attribute queue_uri.
21 22 23 |
# File 'lib/kriterion/worker.rb', line 21 def queue_uri @queue_uri end |
#standards ⇒ Object (readonly)
Returns the value of attribute standards.
22 23 24 |
# File 'lib/kriterion/worker.rb', line 22 def standards @standards end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
19 20 21 |
# File 'lib/kriterion/worker.rb', line 19 def uri @uri end |
Instance Method Details
#process_report(report) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/kriterion/worker.rb', line 63 def process_report(report) report = Kriterion::Report.new(report) # Check if the report contains any relevant resources standard_names = standards.keys relevant_resources = report.(standard_names) return nil if relevant_resources.empty? logger.info "Processing report with #{relevant_resources.count} relevant resources" # Purge all old events relevant to this node, they will be re-added backend.purge_events! report.certname # Process the report affected_standards = relevant_resources.group_by do |resource| # Select the standard tag stds = (resource. & standard_names) raise 'Found a resource that was relevant to more than one standard. This is not yet supported' if stds.length > 1 stds[0] end affected_standards.each do |name, resources| standard = backend.get_standard(name, recurse: true) unless standard # If the standard doesn't yet exist in the backed, add it standard = Kriterion::Standard.new(@standards[name]) logger.debug "Adding starndard #{standard.name} to backend" backend.add_standard(standard) # TODO: See if there is a better way to deal with this, the reason I'm # doing this is that I want to make sure that there is not difference # between a newly created object and one that came from the database standard = backend.get_standard(name, recurse: true) end resources.each do |resource| # Get the section tag section_tag = resource..select do |t| standard.item_syntax.match(t) end # TODO: Make this work raise 'Found a resource relevant to multiple sections' if section_tag.length > 1 section_tag = section_tag.first # Go though all sections and subsections and create them if required captures = standard.item_syntax.match(section_tag).captures - [nil] # Convert the captures to a list of sections, but excluse the last one # because that will be the name of the item parent_sections = captures_to_sections(standard, captures[0..-2]) # If there are no captures then this is a direct child of a standard if captures.nil? section = standard else section = parent_sections.reduce(standard) do |previous, current| # If the section already exists return it if previous.find_section(current) previous.find_section(current) else # This is a new section that does not yet exist in the database, # we therefore need to get the details and all them all in current_section_name = if previous.is_a? Kriterion::Standard current elsif previous.is_a? Kriterion::Section [ previous.name, current ].join(standard.section_separator) end # Get the details from the standards database (name, description # etc.) current_section = @standards[name]['sections'].select do |s| s['name'] == current_section_name end[0] if current_section.nil? previous else # Create the new section object current_section['standard'] = standard.name current_section['parent_type'] = previous.type current_section['parent_uuid'] = previous.uuid current_section = Kriterion::Section.new(current_section) # Add the section to the backend backend.add_section(current_section) current_section end end end end # Create and add the item if it doesn't yet exist item = case section.items.select { |i| i.id == section_tag }.count when 1 # The item already exists, return it section.items.select { |i| i.id == section_tag }[0] when 0 # The item does not exist, create it, add to the database, # then return it item_details = @standards[name]['items'].select do |i| i['id'].upcase == section_tag.upcase end[0] item_details['parent_uuid'] = section.uuid item_details['parent_type'] = section.type item_details['section_path'] = captures backend.add_item(Kriterion::Item.new(item_details)) else raise "Found muliple sections with the id #{section_tag}" end # Add extra contextual data to that resource resource.parent_uuid = item.uuid # Add the new resource to the backend if it doesn't exist unless item.resources.include? resource item.resources << resource backend.add_resource(resource) end # Inform the database that this node is unchanged if we have no events if resource.events.empty? backend.add_unchanged_node(resource, report.certname) end # Add all events to the database resource.events = resource.events.map do |event| event = Kriterion::Event.new(event) event.certname = report.certname event.resource = resource.resource backend.add_event(event) event end metrics[:update_compliance] += Benchmark.realtime do # Finally update the compliance details for this resource and its # parent item backend.update_compliance! resource backend.update_compliance! item # Find all of the parent sections and update the compliance on them # Don't recalculate the compliance of the standard yet, wait until # the end. item.parent_names(standard.section_separator).each do |parent| # TODO: Complete this so that it updates the compliance of # everything. It's probably better if we re-query this stuff from # the database to reduce the chances of race conditions result = backend.find_sections( name: parent, standard: standard.name ) result.each { |r| backend.update_compliance! r } end end end # Reload the standard as new sections may have been added standard = backend.get_standard(name, recurse: true) metrics[:update_compliance] += Benchmark.realtime do # Recalculate the compliance of a given standard once it is done backend.update_compliance! standard end end end |
#run ⇒ Object
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/kriterion/worker.rb', line 235 def run while true do # Connect and check if there is anythong on the queue # TODO: Change this so that they listen properly logger.debug "GET #{queue_uri}" begin response = Net::HTTP.get_response(queue_uri) case response.code when '204' logger.debug 'Queue empty, sleeping...' sleep 3 when '200' logger.debug 'Got a report, parsing...' report = JSON.parse(JSON.parse(response.body)['value']) logger.info "Processing report: #{report['host']} #{report['time']}" metrics[:total_processing] += Benchmark.realtime do process_report(report) end metrics.print end rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError, Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError, Errno::ECONNREFUSED, SocketError => e logger.error "Error while running: #{e}" logger.info 'Sleeping...' sleep 3 end end end |