Class: Sigstore::Verifier

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/sigstore/verifier.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

included, #logger

Constructor Details

#initialize(rekor_client:, fulcio_cert_chain:, timestamp_authorities:, rekor_keyring:, ct_keyring:) ⇒ Verifier

Returns a new instance of Verifier.



31
32
33
34
35
36
37
# File 'lib/sigstore/verifier.rb', line 31

def initialize(rekor_client:, fulcio_cert_chain:, timestamp_authorities:, rekor_keyring:, ct_keyring:)
  @rekor_client = rekor_client
  @fulcio_cert_chain = fulcio_cert_chain
  @timestamp_authorities = timestamp_authorities
  @rekor_keyring = rekor_keyring
  @ct_keyring = ct_keyring
end

Instance Attribute Details

#rekor_clientObject (readonly)

Returns the value of attribute rekor_client.



29
30
31
# File 'lib/sigstore/verifier.rb', line 29

def rekor_client
  @rekor_client
end

Class Method Details

.for_trust_root(trust_root:) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/sigstore/verifier.rb', line 39

def self.for_trust_root(trust_root:)
  new(
    rekor_client: Rekor::Client.new(url: trust_root.tlog_for_signing.base_url),
    fulcio_cert_chain: trust_root.fulcio_cert_chain,
    timestamp_authorities: trust_root.timestamp_authorities,
    rekor_keyring: Internal::Keyring.new(keys: trust_root.rekor_keys),
    ct_keyring: Internal::Keyring.new(keys: trust_root.ctfe_keys)
  )
end

.production(trust_root: TrustedRoot.production) ⇒ Object



49
50
51
# File 'lib/sigstore/verifier.rb', line 49

def self.production(trust_root: TrustedRoot.production)
  for_trust_root(trust_root:)
end

.staging(trust_root: TrustedRoot.staging) ⇒ Object



53
54
55
# File 'lib/sigstore/verifier.rb', line 53

def self.staging(trust_root: TrustedRoot.staging)
  for_trust_root(trust_root:)
end

Instance Method Details

#verify(input:, policy:, offline:) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/sigstore/verifier.rb', line 57

def verify(input:, policy:, offline:)
  # First, establish a time for the signature. This timestamp is required to validate the certificate chain,
  # so this step comes first.

  bundle = input.sbundle
  materials = bundle.verification_material

  # 1)
  # If the verification policy uses the Timestamping Service, the Verifier MUST verify the timestamping response
  # using the Timestamping Service root key material, as described in Spec: Timestamping Service, with the raw bytes
  # of the signature as the timestamped data. The Verifier MUST then extract a timestamp from the timestamping
  # response. If verification or timestamp parsing fails, the Verifier MUST abort.

  timestamps = extract_timestamp_from_verification_data(materials.timestamp_verification_data) || []

  # 2)
  # If the verification policy uses timestamps from the Transparency Service, the Verifier MUST verify the signature
  # on the Transparency Service LogEntry as described in Spec: Transparency Service against the pre-distributed root
  # key material from the transparency service. The Verifier SHOULD NOT (yet) attempt to parse the body.
  # The Verifier MUST then parse the integratedTime as a Unix timestamp (seconds since January 1, 1970 UTC).
  # If verification or timestamp parsing fails, the Verifier MUST abort.

  begin
    # TODO: should this instead be an input to the verify method?
    # See https://docs.google.com/document/d/1kbhK2qyPPk8SLavHzYSDM8-Ueul9_oxIMVFuWMWKz0E/edit?disco=AAABQVV-gT0
    entry = find_rekor_entry(bundle, input.hashed_input, offline:)
  rescue Sigstore::Error::MissingRekorEntry
    return VerificationFailure.new("Rekor entry not found")
  else
    if entry.inclusion_proof&.checkpoint
      Internal::Merkle.verify_merkle_inclusion(entry)
      Rekor::Checkpoint.verify_checkpoint(@rekor_keyring, entry)
    elsif !offline
      return VerificationFailure.new("Missing Rekor inclusion proof")
    else
      logger.warn "inclusion proof not present in bundle: skipping due to offline verification"
    end
  end

  Internal::SET.verify_set(keyring: @rekor_keyring, entry:) if entry.inclusion_promise

  timestamps << Time.at(entry.integrated_time).utc

  # TODO: implement this step

  store = OpenSSL::X509::Store.new

  @fulcio_cert_chain.each do |cert|
    store.add_cert(cert.openssl)
  end

  # 3)
  # The Verifier MUST perform certification path validation (RFC 5280 §6) of the certificate chain with the
  # pre-distributed Fulcio root certificate(s) as a trust anchor, but with a fake “current time.”
  # If a timestamp from the timestamping service is available, the Verifier MUST perform path validation using the
  # timestamp from the Timestamping Service. If a timestamp from the Transparency Service is available, the Verifier
  # MUST perform path validation using the timestamp from the Transparency Service. If both are available, the
  # Verifier performs path validation twice. If either fails, verification fails.
  chains = timestamps.map do |ts|
    store_ctx = OpenSSL::X509::StoreContext.new(store, bundle.leaf_certificate.openssl)
    store_ctx.time = ts

    unless store_ctx.verify
      return VerificationFailure.new(
        "failed to validate certificate from fulcio cert chain: #{store_ctx.error_string}"
      )
    end

    chain = store_ctx.chain || raise(Error::InvalidCertificate, "no valid cert chain found")
    chain.shift # remove the cert itself
    chain.map! { Internal::X509::Certificate.new(_1) }
  end

  chains.uniq! { |chain| chain.map(&:to_der) }
  unless chains.size == 1
    raise "expected exactly one certificate chain, got #{chains.size} chains:\n" +
          chains.map do |chain|
            chain.map(&:to_text).join("\n")
          end.join("\n\n")
  end

  # 4)
  # Unless performing online verification (see §Alternative Workflows), the Verifier MUST extract the
  # SignedCertificateTimestamp embedded in the leaf certificate, and verify it as in RFC 9162 §8.1.3,
  # using the verification key from the Certificate Transparency Log.
  chain = chains.first
  if (result = verify_scts(bundle.leaf_certificate, chain)) && !result.verified?
    return result
  end

  # 5)
  # The Verifier MUST then check the certificate against the verification policy.

  usage_ext = bundle.leaf_certificate.extension(Internal::X509::Extension::KeyUsage)
  return VerificationFailure.new("Key usage is not of type `digital signature`") unless usage_ext.digital_signature

  extended_key_usage = bundle.leaf_certificate.extension(Internal::X509::Extension::ExtendedKeyUsage)
  unless extended_key_usage.code_signing?
    return VerificationFailure.new("Extended key usage is not of type `code signing`")
  end

  policy_check = policy.verify(bundle.leaf_certificate)
  return policy_check unless policy_check.verified?

  # 6)
  # By this point, the Verifier has already verified the signature by the Transparency Service (§Establishing a Time
  #  for the Signature). The Verifier MUST parse body: body is a base64-encoded JSON document with keys apiVersion
  #  and kind. The Verifier implementation contains a list of known Transparency Service formats (by apiVersion and
  #  kind); if no type is found, abort. The Verifier MUST parse body as the given type.
  #
  # Then, the Verifier MUST check the following; exactly how to do this will be specified by each type in Spec:
  # Sigstore Registries (§Signature Metadata Formats):
  #
  #  * The signature from the parsed body is the same as the provided signature.
  #  * The key or certificate from the parsed body is the same as in the input certificate.
  #  * The “subject” of the parsed body matches the artifact.

  signing_key = bundle.leaf_certificate.public_key

  case bundle.content
  when :message_signature
    verified = verify_raw(signing_key, bundle.message_signature.signature, input.hashed_input.digest)
    return VerificationFailure.new("Signature verification failed") unless verified
  when :dsse_envelope
    verify_dsse(bundle.dsse_envelope, signing_key) or
      return VerificationFailure.new("DSSE envelope verification failed")

    case bundle.dsse_envelope.payloadType
    when "application/vnd.in-toto+json"
      in_toto = begin
        JSON.parse(bundle.dsse_envelope.payload)
      rescue JSON::ParserError
        raise Error::InvalidBundle, "invalid JSON for in-toto statement in DSSE payload"
      end
      verify_in_toto(input, in_toto)
    else
      raise Sigstore::Error::Unimplemented,
            "unsupported DSSE payload type: #{bundle.dsse_envelope.payloadType.inspect}"
    end
  else
    raise Error::InvalidBundle, "unknown content type: #{bundle.content}"
  end

  VerificationSuccess.new
end

#verify_scts(leaf_certificate, chain) ⇒ Object



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/sigstore/verifier.rb', line 257

def verify_scts(leaf_certificate, chain)
  sct_list = leaf_certificate
             .extension(Internal::X509::Extension::PrecertificateSignedCertificateTimestamps)
             .signed_certificate_timestamps
  raise Error::InvalidCertificate, "no SCTs found" if sct_list.empty?

  sct_list.each do |sct|
    verified = verify_sct(
      sct,
      leaf_certificate,
      chain,
      @ct_keyring
    )
    return VerificationFailure.new("SCT verification failed") unless verified
  end

  nil
end