Module: PWN::Plugins::OpenAPI
- Defined in:
- lib/pwn/plugins/open_api.rb
Overview
Module to interact with OpenAPI specifications, merging multiple specs while resolving schema dependencies and ensuring OpenAPI compliance.
Class Method Summary collapse
- .authors ⇒ Object
-
.generate_spec(opts = {}) ⇒ Object
Supported Method Parameters: openapi_spec = PWN::Plugins::OpenAPI.generate_spec( spec_paths: ‘required - array of OpenAPI file paths to merge’, base_url: ‘required - base URL for OpenAPI endpoints (e.g., fqdn.com)’, output_json_path: ‘optional - path to save the merged OpenAPI JSON file’, target_version: ‘optional - target OpenAPI version (default: 3.0.3)’, debug: ‘optional - boolean to enable debug logging (default: false)’ ).
- .help ⇒ Object
Class Method Details
.authors ⇒ Object
875 876 877 878 879 |
# File 'lib/pwn/plugins/open_api.rb', line 875 public_class_method def self. "AUTHOR(S): 0day Inc. <[email protected]> " end |
.generate_spec(opts = {}) ⇒ Object
Supported Method Parameters: openapi_spec = PWN::Plugins::OpenAPI.generate_spec(
spec_paths: 'required - array of OpenAPI file paths to merge',
base_url: 'required - base URL for OpenAPI endpoints (e.g., http://fqdn.com)',
output_json_path: 'optional - path to save the merged OpenAPI JSON file',
target_version: 'optional - target OpenAPI version (default: 3.0.3)',
debug: 'optional - boolean to enable debug logging (default: false)'
)
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/pwn/plugins/open_api.rb', line 23 def self.generate_spec(opts = {}) spec_paths = opts[:spec_paths] ||= [] raise ArgumentError, 'spec_paths must be a non-empty array' if spec_paths.empty? # Normalize spec_paths to absolute paths spec_paths = spec_paths.map { |p| File.(p) } base_url = opts[:base_url] raise ArgumentError, 'base_url is required' if base_url.nil? || base_url.empty? target_version = opts[:target_version] ||= '3.0.3' raise ArgumentError, "Unsupported OpenAPI version: #{target_version}" unless %w[3.0.0 3.0.1 3.0.2 3.0.3 3.1.0].include?(target_version) output_json_path = opts[:output_json_path] raise ArgumentError, 'output_json_path is required' if output_json_path.nil? || output_json_path.empty? debug = opts[:debug] || false validation_fixes = [] begin spec_path_root = File.dirname(spec_paths.first) Dir.chdir(spec_path_root) # Parse base_url to extract host and default base path normalized_base_url, default_base_path = normalize_url(url: base_url) default_base_path ||= '' # Fallback if base_url has no path log("Using normalized base URL: #{normalized_base_url}, default base path: #{default_base_path}", debug: debug) # Load and parse all OpenAPI files specs = {} spec_paths.each do |path| raise Errno::ENOENT, "OpenAPI file not found: #{path}" unless File.exist?(path) begin case File.extname(path).downcase when '.yaml', '.yml' specs[path] = YAML.safe_load_file(path, permitted_classes: [Symbol, Date, Time], aliases: true) when '.json' specs[path] = JSON.parse(File.read(path)) else raise "Unsupported file type: #{path} - only .yaml, .yml, and .json files" end rescue YAML::SyntaxError, JSON::ParserError => e raise "Error parsing OpenAPI file #{path}: #{e.}" end end specs.each do |path, spec| # Pre-validate input specs if spec['paths'].is_a?(Hash) spec['paths'].each do |endpoint, path_item| next unless path_item.is_a?(Hash) path_item.each do |method, operation| next unless operation.is_a?(Hash) && operation['parameters'].is_a?(Array) param_names = operation['parameters'].map { |p| p['name'] }.compact duplicates = param_names.tally.select { |_, count| count > 1 }.keys raise "Duplicate parameters found in #{path} for path '#{endpoint}' (method: #{method}): #{duplicates.join(', ')}" unless duplicates.empty? operation['parameters'].each do |param| next unless param['in'] == 'path' raise "Path parameter #{param['name']} in #{path} (path: #{endpoint}, method: #{method}) must have a schema" unless param['schema'].is_a?(Hash) end end end end # Clean up null schemas in each spec clean_null_schemas(spec, path, '', validation_fixes, debug) # Fix invalid header definitions if spec['components']&.key?('headers') spec['components']['headers'].each do |header_name, header| next unless header.is_a?(Hash) if header.key?('name') || header.key?('in') validation_fixes << { path: "/components/headers/#{header_name}", error: "Invalid properties 'name' or 'in' in header", fix: "Removed 'name' and 'in' from header definition" } log("Fixing header '#{header_name}' in #{path}: Removing invalid 'name' and 'in' properties", debug: debug) header.delete('name') header.delete('in') end next unless header['schema'].nil? validation_fixes << { path: "/components/headers/#{header_name}", error: 'Header schema is null', fix: 'Added default schema { type: string }' } log("Fixing header '#{header_name}' in #{path}: Replacing null schema with default { type: string }", debug: debug) header['schema'] = { 'type' => 'string' } end end # Fix schema items for arrays (e.g., mediaServers) next unless spec['components']&.key?('schemas') spec['components']['schemas'].each do |schema_name, schema| fix_array_items(schema, path, "/components/schemas/#{schema_name}", validation_fixes, debug) end end # Determine dependencies based on $ref dependencies = {} specs.each do |path, spec| dependencies[path] = [] # Initialize empty array for all paths refs = extract_refs(spec: spec, spec_paths: spec_paths) refs.each do |ref| dep_path = resolve_ref_path(ref: ref, spec_paths: spec_paths, referencing_file: path) dependencies[path] << dep_path if specs.key?(dep_path) && dep_path != path end end # Sort files by dependencies ordered_paths, cycle_info = topological_sort(dependencies: dependencies, spec_paths: spec_paths) if cycle_info log("Cyclic dependencies detected: #{cycle_info.join(' -> ')}. Processing files in provided order.", debug: debug) ordered_paths = spec_paths end # Initialize merged specification with a single server merged_spec = { 'openapi' => target_version, 'info' => { 'title' => 'Merged OpenAPI Specification', 'version' => '1.0.0' }, 'servers' => [{ 'url' => normalized_base_url, 'description' => 'Default server' }], 'paths' => {}, 'components' => { 'schemas' => {}, 'headers' => {} }, 'tags' => [], 'security' => [] } # Collect base paths from server URLs server_base_paths = {} ordered_paths.each do |path| spec = specs[path] unless spec.is_a?(Hash) log("Skipping #{path}: Invalid OpenAPI specification", debug: debug) next end log("Warning: #{path} uses OpenAPI version #{spec['openapi']}, which may not be compatible with target version #{target_version}", debug: debug) if spec['openapi'] && !spec['openapi'].start_with?(target_version.split('.')[0..1].join('.')) if spec['definitions'] && target_version.start_with?('3.') log("Migrating OpenAPI 2.0 'definitions' to 'components/schemas' for #{path}", debug: debug) spec['components'] ||= {} spec['components']['schemas'] = spec.delete('definitions') end resolved_spec = resolve_refs(spec: spec, specs: specs, spec_paths: spec_paths, referencing_file: path, debug: debug, target_version: target_version) # Process server URLs selected_server = nil server_base_path = nil absolute_url = nil if resolved_spec['servers'] servers = resolved_spec['servers'].is_a?(Array) ? resolved_spec['servers'] : [resolved_spec['servers']] # Prioritize server with non-empty path selected_server = servers.find { |s| s.is_a?(Hash) && s['url'] && !URI.parse(s['url']).path.empty? } || servers.find { |s| s.is_a?(Hash) && s['description'] } || servers.first server_url = selected_server.is_a?(Hash) ? selected_server['url'] : selected_server if server_url.is_a?(String) absolute_url, server_base_path = normalize_url(url: server_url, base_url: normalized_base_url) server_base_path ||= default_base_path log("Selected server URL: #{server_url}, normalized: #{absolute_url}, base path: #{server_base_path} for #{path}", debug: debug) server_obj = selected_server.is_a?(Hash) ? selected_server.merge('url' => absolute_url) : { 'url' => absolute_url } unless merged_spec['servers'].any? { |s| s['url'] == absolute_url } merged_spec['servers'] << server_obj # Update default_base_path if servers length > 1 if merged_spec['servers'].length > 1 last_server_url = merged_spec['servers'].last['url'] new_base_path = URI.parse(last_server_url).path&.sub(%r{^/+}, '')&.sub(%r{/+$}, '') default_base_path = new_base_path || default_base_path log("Updated default_base_path to '#{default_base_path}' based on last server: #{last_server_url}", debug: debug) end end else log("No valid server URL in #{path}, using default base path: #{default_base_path}", debug: debug) absolute_url = normalized_base_url server_base_path = default_base_path end else # Check dependencies for server URLs (dependencies[path] || []).each do |dep_path| dep_spec = specs[dep_path] next unless dep_spec['servers'] dep_servers = dep_spec['servers'].is_a?(Array) ? dep_spec['servers'] : [dep_spec['servers']] dep_server = dep_servers.find { |s| s.is_a?(Hash) && s['url'] && !URI.parse(s['url']).path.empty? } next unless dep_server dep_server_url = dep_server['url'] absolute_url, server_base_path = normalize_url(url: dep_server_url, base_url: normalized_base_url) server_base_path ||= default_base_path log("Using dependency server URL: #{dep_server_url}, normalized: #{absolute_url}, base path: #{server_base_path} for #{path}", debug: debug) server_obj = dep_server.merge('url' => absolute_url) unless merged_spec['servers'].any? { |s| s['url'] == absolute_url } merged_spec['servers'] << server_obj # Update default_base_path if servers length > 1 if merged_spec['servers'].length > 1 last_server_url = merged_spec['servers'].last['url'] new_base_path = URI.parse(last_server_url).path&.sub(%r{^/+}, '')&.sub(%r{/+$}, '') default_base_path = new_base_path || default_base_path log("Updated default_base_path to '#{default_base_path}' based on last server: #{last_server_url}", debug: debug) end end break end unless absolute_url log("No servers defined in #{path} or dependencies, using default base path: #{default_base_path}", debug: debug) absolute_url = normalized_base_url server_base_path = default_base_path end end server_base_paths[path] = server_base_path # Normalize paths if resolved_spec['paths'].is_a?(Hash) resolved_spec['paths'] = validate_path_parameters( resolved_spec['paths'], path, server_base_path: server_base_path, debug: debug ) end merged_spec['openapi'] = [resolved_spec['openapi'], target_version].max if resolved_spec['openapi'] if resolved_spec['info'].is_a?(Hash) merged_spec['info'] = deep_merge(hash1: merged_spec['info'], hash2: resolved_spec['info']) raise "Missing required info.title in #{path}" unless merged_spec['info']['title'] raise "Missing required info.version in #{path}" unless merged_spec['info']['version'] end if resolved_spec['paths'].is_a?(Hash) resolved_paths = resolved_spec['paths'].transform_keys do |endpoint| effective_base_path = server_base_paths[path] # Strip redundant base path before combining normalized_endpoint = endpoint.to_s.sub(%r{^/+}, '').sub(%r{/+$}, '') if effective_base_path && !effective_base_path.empty? prefix_pattern = Regexp.new("^#{Regexp.escape(effective_base_path)}/") while normalized_endpoint.match?(prefix_pattern) normalized_endpoint = normalized_endpoint.sub(prefix_pattern, '') log("Stripped '#{effective_base_path}' from endpoint '#{endpoint}' to '#{normalized_endpoint}' during merge in #{path}", debug: debug) end end normalized_endpoint = '/' if normalized_endpoint.empty? combined_path = combine_paths(effective_base_path, normalized_endpoint) log("Merging path '#{endpoint}' as '#{combined_path}' from #{path}", debug: debug) combined_path end merged_spec['paths'].merge!(resolved_paths) do |api_endpoint, _existing, new| log("Path '#{api_endpoint}' in #{path} conflicts with existing path. Overwriting.", debug: debug) new end end merged_spec['components'] = deep_merge(hash1: merged_spec['components'], hash2: resolved_spec['components']) if resolved_spec['components'].is_a?(Hash) if resolved_spec['tags'].is_a?(Array) resolved_spec['tags'].each do |tag| merged_spec['tags'] << tag unless merged_spec['tags'].include?(tag) end end next unless resolved_spec['security'].is_a?(Array) resolved_spec['security'].each do |security| merged_spec['security'] << security unless merged_spec['security'].include?(security) end end # Filter servers to keep only those with paths matching the first folder in paths if merged_spec['paths'].any? path_first_folders = merged_spec['paths'].keys.map do |path| path_segments = path.sub(%r{^/+}, '').split('/') path_segments.first if path_segments.any? end.compact.uniq log("First folders in paths: #{path_first_folders}", debug: debug) if path_first_folders.any? merged_spec['servers'] = merged_spec['servers'].select do |server| server_url = server['url'] server_path = URI.parse(server_url).path&.sub(%r{^/+}, '')&.sub(%r{/+$}, '') server_path && path_first_folders.include?(server_path) end log("Filtered servers to: #{merged_spec['servers'].map { |s| s['url'] }}", debug: debug) end end # Ensure at least one server remains if merged_spec['servers'].empty? merged_spec['servers'] = [{ 'url' => normalized_base_url, 'description' => 'Default server' }] log("No servers matched path prefixes. Reverted to default: #{normalized_base_url}", debug: debug) end # Remove server path prefixes from path keys merged_spec = remove_server_path_prefixes(merged_spec, debug: debug) # Clean up null schemas in the merged spec clean_null_schemas(merged_spec, 'merged_spec', '', validation_fixes, debug) merged_spec, schema_validation_errors = validate_openapi_spec( merged_spec: merged_spec, target_version: target_version, debug: debug ) unless validation_fixes.empty? && schema_validation_errors.empty? merged_spec['x-validation-fixes'] = validation_fixes + schema_validation_errors log("Added validation fixes to spec: #{merged_spec['x-validation-fixes'].map { |f| f[:error] }.join(', ')}", debug: debug) end FileUtils.mkdir_p(File.dirname(output_json_path)) File.write(output_json_path, JSON.pretty_generate(merged_spec)) log("Merged OpenAPI specification written to: #{output_json_path}", debug: debug) # { individual_specs: specs, merged_spec: merged_spec } output_json_path rescue Errno::ENOENT => e raise "Error accessing file: #{e.}" rescue StandardError => e raise "Unexpected error: #{e.}" end end |
.help ⇒ Object
881 882 883 884 885 886 887 888 889 890 891 892 893 |
# File 'lib/pwn/plugins/open_api.rb', line 881 public_class_method def self.help puts "USAGE: openapi_spec = #{self}.generate_spec( spec_paths: 'required - array of OpenAPI file paths to merge', base_url: 'required - base URL for OpenAPI endpoints (e.g., http://fqdn.com)', output_json_path: 'required - path to save the merged OpenAPI JSON file', target_version: 'optional - target OpenAPI version (default: 3.0.3)', debug: 'optional - boolean to enable debug logging (default: false)' ) #{self}.authors " end |