Module: PWN::Plugins::OpenAPI

Defined in:
lib/pwn/plugins/open_api.rb

Overview

Module to interact with OpenAPI specifications, merging multiple specs while resolving schema dependencies and ensuring OpenAPI compliance.

Class Method Summary collapse

Class Method Details

.authorsObject



875
876
877
878
879
# File 'lib/pwn/plugins/open_api.rb', line 875

public_class_method def self.authors
  "AUTHOR(S):
    0day Inc. <[email protected]>
  "
end

.generate_spec(opts = {}) ⇒ Object

Supported Method Parameters: openapi_spec = PWN::Plugins::OpenAPI.generate_spec(

spec_paths: 'required - array of OpenAPI file paths to merge',
base_url: 'required - base URL for OpenAPI endpoints (e.g., http://fqdn.com)',
output_json_path: 'optional - path to save the merged OpenAPI JSON file',
target_version: 'optional - target OpenAPI version (default: 3.0.3)',
debug: 'optional - boolean to enable debug logging (default: false)'

)

Raises:

  • (ArgumentError)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/pwn/plugins/open_api.rb', line 23

def self.generate_spec(opts = {})
  spec_paths = opts[:spec_paths] ||= []
  raise ArgumentError, 'spec_paths must be a non-empty array' if spec_paths.empty?

  # Normalize spec_paths to absolute paths
  spec_paths = spec_paths.map { |p| File.expand_path(p) }

  base_url = opts[:base_url]
  raise ArgumentError, 'base_url is required' if base_url.nil? || base_url.empty?

  target_version = opts[:target_version] ||= '3.0.3'
  raise ArgumentError, "Unsupported OpenAPI version: #{target_version}" unless %w[3.0.0 3.0.1 3.0.2 3.0.3 3.1.0].include?(target_version)

  output_json_path = opts[:output_json_path]
  raise ArgumentError, 'output_json_path is required' if output_json_path.nil? || output_json_path.empty?

  debug = opts[:debug] || false
  validation_fixes = []

  begin
    spec_path_root = File.dirname(spec_paths.first)
    Dir.chdir(spec_path_root)

    # Parse base_url to extract host and default base path
    normalized_base_url, default_base_path = normalize_url(url: base_url)
    default_base_path ||= '' # Fallback if base_url has no path
    log("Using normalized base URL: #{normalized_base_url}, default base path: #{default_base_path}", debug: debug)

    # Load and parse all OpenAPI files
    specs = {}
    spec_paths.each do |path|
      raise Errno::ENOENT, "OpenAPI file not found: #{path}" unless File.exist?(path)

      begin
        case File.extname(path).downcase
        when '.yaml', '.yml'
          specs[path] = YAML.safe_load_file(path, permitted_classes: [Symbol, Date, Time], aliases: true)
        when '.json'
          specs[path] = JSON.parse(File.read(path))
        else
          raise "Unsupported file type: #{path} - only .yaml, .yml, and .json files"
        end
      rescue YAML::SyntaxError, JSON::ParserError => e
        raise "Error parsing OpenAPI file #{path}: #{e.message}"
      end
    end

    specs.each do |path, spec|
      # Pre-validate input specs
      if spec['paths'].is_a?(Hash)
        spec['paths'].each do |endpoint, path_item|
          next unless path_item.is_a?(Hash)

          path_item.each do |method, operation|
            next unless operation.is_a?(Hash) && operation['parameters'].is_a?(Array)

            param_names = operation['parameters'].map { |p| p['name'] }.compact
            duplicates = param_names.tally.select { |_, count| count > 1 }.keys
            raise "Duplicate parameters found in #{path} for path '#{endpoint}' (method: #{method}): #{duplicates.join(', ')}" unless duplicates.empty?

            operation['parameters'].each do |param|
              next unless param['in'] == 'path'

              raise "Path parameter #{param['name']} in #{path} (path: #{endpoint}, method: #{method}) must have a schema" unless param['schema'].is_a?(Hash)
            end
          end
        end
      end

      # Clean up null schemas in each spec
      clean_null_schemas(spec, path, '', validation_fixes, debug)

      # Fix invalid header definitions
      if spec['components']&.key?('headers')
        spec['components']['headers'].each do |header_name, header|
          next unless header.is_a?(Hash)

          if header.key?('name') || header.key?('in')
            validation_fixes << {
              path: "/components/headers/#{header_name}",
              error: "Invalid properties 'name' or 'in' in header",
              fix: "Removed 'name' and 'in' from header definition"
            }
            log("Fixing header '#{header_name}' in #{path}: Removing invalid 'name' and 'in' properties", debug: debug)
            header.delete('name')
            header.delete('in')
          end
          next unless header['schema'].nil?

          validation_fixes << {
            path: "/components/headers/#{header_name}",
            error: 'Header schema is null',
            fix: 'Added default schema { type: string }'
          }
          log("Fixing header '#{header_name}' in #{path}: Replacing null schema with default { type: string }", debug: debug)
          header['schema'] = { 'type' => 'string' }
        end
      end

      # Fix schema items for arrays (e.g., mediaServers)
      next unless spec['components']&.key?('schemas')

      spec['components']['schemas'].each do |schema_name, schema|
        fix_array_items(schema, path, "/components/schemas/#{schema_name}", validation_fixes, debug)
      end
    end

    # Determine dependencies based on $ref
    dependencies = {}
    specs.each do |path, spec|
      dependencies[path] = [] # Initialize empty array for all paths
      refs = extract_refs(spec: spec, spec_paths: spec_paths)
      refs.each do |ref|
        dep_path = resolve_ref_path(ref: ref, spec_paths: spec_paths, referencing_file: path)
        dependencies[path] << dep_path if specs.key?(dep_path) && dep_path != path
      end
    end

    # Sort files by dependencies
    ordered_paths, cycle_info = topological_sort(dependencies: dependencies, spec_paths: spec_paths)
    if cycle_info
      log("Cyclic dependencies detected: #{cycle_info.join(' -> ')}. Processing files in provided order.", debug: debug)
      ordered_paths = spec_paths
    end

    # Initialize merged specification with a single server
    merged_spec = {
      'openapi' => target_version,
      'info' => {
        'title' => 'Merged OpenAPI Specification',
        'version' => '1.0.0'
      },
      'servers' => [{ 'url' => normalized_base_url, 'description' => 'Default server' }],
      'paths' => {},
      'components' => { 'schemas' => {}, 'headers' => {} },
      'tags' => [],
      'security' => []
    }

    # Collect base paths from server URLs
    server_base_paths = {}

    ordered_paths.each do |path|
      spec = specs[path]
      unless spec.is_a?(Hash)
        log("Skipping #{path}: Invalid OpenAPI specification", debug: debug)
        next
      end

      log("Warning: #{path} uses OpenAPI version #{spec['openapi']}, which may not be compatible with target version #{target_version}", debug: debug) if spec['openapi'] && !spec['openapi'].start_with?(target_version.split('.')[0..1].join('.'))

      if spec['definitions'] && target_version.start_with?('3.')
        log("Migrating OpenAPI 2.0 'definitions' to 'components/schemas' for #{path}", debug: debug)
        spec['components'] ||= {}
        spec['components']['schemas'] = spec.delete('definitions')
      end

      resolved_spec = resolve_refs(spec: spec, specs: specs, spec_paths: spec_paths, referencing_file: path, debug: debug, target_version: target_version)

      # Process server URLs
      selected_server = nil
      server_base_path = nil
      absolute_url = nil

      if resolved_spec['servers']
        servers = resolved_spec['servers'].is_a?(Array) ? resolved_spec['servers'] : [resolved_spec['servers']]
        # Prioritize server with non-empty path
        selected_server = servers.find { |s| s.is_a?(Hash) && s['url'] && !URI.parse(s['url']).path.empty? } ||
                          servers.find { |s| s.is_a?(Hash) && s['description'] } ||
                          servers.first

        server_url = selected_server.is_a?(Hash) ? selected_server['url'] : selected_server
        if server_url.is_a?(String)
          absolute_url, server_base_path = normalize_url(url: server_url, base_url: normalized_base_url)
          server_base_path ||= default_base_path
          log("Selected server URL: #{server_url}, normalized: #{absolute_url}, base path: #{server_base_path} for #{path}", debug: debug)
          server_obj = selected_server.is_a?(Hash) ? selected_server.merge('url' => absolute_url) : { 'url' => absolute_url }
          unless merged_spec['servers'].any? { |s| s['url'] == absolute_url }
            merged_spec['servers'] << server_obj
            # Update default_base_path if servers length > 1
            if merged_spec['servers'].length > 1
              last_server_url = merged_spec['servers'].last['url']
              new_base_path = URI.parse(last_server_url).path&.sub(%r{^/+}, '')&.sub(%r{/+$}, '')
              default_base_path = new_base_path || default_base_path
              log("Updated default_base_path to '#{default_base_path}' based on last server: #{last_server_url}", debug: debug)
            end
          end
        else
          log("No valid server URL in #{path}, using default base path: #{default_base_path}", debug: debug)
          absolute_url = normalized_base_url
          server_base_path = default_base_path
        end
      else
        # Check dependencies for server URLs
        (dependencies[path] || []).each do |dep_path|
          dep_spec = specs[dep_path]
          next unless dep_spec['servers']

          dep_servers = dep_spec['servers'].is_a?(Array) ? dep_spec['servers'] : [dep_spec['servers']]
          dep_server = dep_servers.find { |s| s.is_a?(Hash) && s['url'] && !URI.parse(s['url']).path.empty? }
          next unless dep_server

          dep_server_url = dep_server['url']
          absolute_url, server_base_path = normalize_url(url: dep_server_url, base_url: normalized_base_url)
          server_base_path ||= default_base_path
          log("Using dependency server URL: #{dep_server_url}, normalized: #{absolute_url}, base path: #{server_base_path} for #{path}", debug: debug)
          server_obj = dep_server.merge('url' => absolute_url)
          unless merged_spec['servers'].any? { |s| s['url'] == absolute_url }
            merged_spec['servers'] << server_obj
            # Update default_base_path if servers length > 1
            if merged_spec['servers'].length > 1
              last_server_url = merged_spec['servers'].last['url']
              new_base_path = URI.parse(last_server_url).path&.sub(%r{^/+}, '')&.sub(%r{/+$}, '')
              default_base_path = new_base_path || default_base_path
              log("Updated default_base_path to '#{default_base_path}' based on last server: #{last_server_url}", debug: debug)
            end
          end
          break
        end
        unless absolute_url
          log("No servers defined in #{path} or dependencies, using default base path: #{default_base_path}", debug: debug)
          absolute_url = normalized_base_url
          server_base_path = default_base_path
        end
      end
      server_base_paths[path] = server_base_path

      # Normalize paths
      if resolved_spec['paths'].is_a?(Hash)
        resolved_spec['paths'] = validate_path_parameters(
          resolved_spec['paths'],
          path,
          server_base_path: server_base_path,
          debug: debug
        )
      end

      merged_spec['openapi'] = [resolved_spec['openapi'], target_version].max if resolved_spec['openapi']

      if resolved_spec['info'].is_a?(Hash)
        merged_spec['info'] = deep_merge(hash1: merged_spec['info'], hash2: resolved_spec['info'])
        raise "Missing required info.title in #{path}" unless merged_spec['info']['title']
        raise "Missing required info.version in #{path}" unless merged_spec['info']['version']
      end

      if resolved_spec['paths'].is_a?(Hash)
        resolved_paths = resolved_spec['paths'].transform_keys do |endpoint|
          effective_base_path = server_base_paths[path]
          # Strip redundant base path before combining
          normalized_endpoint = endpoint.to_s.sub(%r{^/+}, '').sub(%r{/+$}, '')
          if effective_base_path && !effective_base_path.empty?
            prefix_pattern = Regexp.new("^#{Regexp.escape(effective_base_path)}/")
            while normalized_endpoint.match?(prefix_pattern)
              normalized_endpoint = normalized_endpoint.sub(prefix_pattern, '')
              log("Stripped '#{effective_base_path}' from endpoint '#{endpoint}' to '#{normalized_endpoint}' during merge in #{path}", debug: debug)
            end
          end
          normalized_endpoint = '/' if normalized_endpoint.empty?
          combined_path = combine_paths(effective_base_path, normalized_endpoint)
          log("Merging path '#{endpoint}' as '#{combined_path}' from #{path}", debug: debug)
          combined_path
        end
        merged_spec['paths'].merge!(resolved_paths) do |api_endpoint, _existing, new|
          log("Path '#{api_endpoint}' in #{path} conflicts with existing path. Overwriting.", debug: debug)
          new
        end
      end

      merged_spec['components'] = deep_merge(hash1: merged_spec['components'], hash2: resolved_spec['components']) if resolved_spec['components'].is_a?(Hash)

      if resolved_spec['tags'].is_a?(Array)
        resolved_spec['tags'].each do |tag|
          merged_spec['tags'] << tag unless merged_spec['tags'].include?(tag)
        end
      end

      next unless resolved_spec['security'].is_a?(Array)

      resolved_spec['security'].each do |security|
        merged_spec['security'] << security unless merged_spec['security'].include?(security)
      end
    end

    # Filter servers to keep only those with paths matching the first folder in paths
    if merged_spec['paths'].any?
      path_first_folders = merged_spec['paths'].keys.map do |path|
        path_segments = path.sub(%r{^/+}, '').split('/')
        path_segments.first if path_segments.any?
      end.compact.uniq
      log("First folders in paths: #{path_first_folders}", debug: debug)

      if path_first_folders.any?
        merged_spec['servers'] = merged_spec['servers'].select do |server|
          server_url = server['url']
          server_path = URI.parse(server_url).path&.sub(%r{^/+}, '')&.sub(%r{/+$}, '')
          server_path && path_first_folders.include?(server_path)
        end
        log("Filtered servers to: #{merged_spec['servers'].map { |s| s['url'] }}", debug: debug)
      end
    end

    # Ensure at least one server remains
    if merged_spec['servers'].empty?
      merged_spec['servers'] = [{ 'url' => normalized_base_url, 'description' => 'Default server' }]
      log("No servers matched path prefixes. Reverted to default: #{normalized_base_url}", debug: debug)
    end

    # Remove server path prefixes from path keys
    merged_spec = remove_server_path_prefixes(merged_spec, debug: debug)

    # Clean up null schemas in the merged spec
    clean_null_schemas(merged_spec, 'merged_spec', '', validation_fixes, debug)

    merged_spec, schema_validation_errors = validate_openapi_spec(
      merged_spec: merged_spec,
      target_version: target_version,
      debug: debug
    )

    unless validation_fixes.empty? && schema_validation_errors.empty?
      merged_spec['x-validation-fixes'] = validation_fixes + schema_validation_errors
      log("Added validation fixes to spec: #{merged_spec['x-validation-fixes'].map { |f| f[:error] }.join(', ')}", debug: debug)
    end

    FileUtils.mkdir_p(File.dirname(output_json_path))
    File.write(output_json_path, JSON.pretty_generate(merged_spec))
    log("Merged OpenAPI specification written to: #{output_json_path}", debug: debug)

    # { individual_specs: specs, merged_spec: merged_spec }
    output_json_path
  rescue Errno::ENOENT => e
    raise "Error accessing file: #{e.message}"
  rescue StandardError => e
    raise "Unexpected error: #{e.message}"
  end
end

.helpObject



881
882
883
884
885
886
887
888
889
890
891
892
893
# File 'lib/pwn/plugins/open_api.rb', line 881

public_class_method def self.help
  puts "USAGE:
    openapi_spec = #{self}.generate_spec(
      spec_paths: 'required - array of OpenAPI file paths to merge',
      base_url: 'required - base URL for OpenAPI endpoints (e.g., http://fqdn.com)',
      output_json_path: 'required - path to save the merged OpenAPI JSON file',
      target_version: 'optional - target OpenAPI version (default: 3.0.3)',
      debug: 'optional - boolean to enable debug logging (default: false)'
    )

    #{self}.authors
  "
end