homelab-k3s/ansible_env/lib/python3.12/site-packages/cryptography/x509/base.py
2024-10-29 23:33:00 +03:00

1227 lines
36 KiB
Python

# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
import datetime
import os
import typing
import warnings
from cryptography import utils
from cryptography.hazmat.bindings._rust import x509 as rust_x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
dsa,
ec,
ed448,
ed25519,
padding,
rsa,
x448,
x25519,
)
from cryptography.hazmat.primitives.asymmetric.types import (
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
CertificatePublicKeyTypes,
)
from cryptography.x509.extensions import (
Extension,
Extensions,
ExtensionType,
_make_sequence_methods,
)
from cryptography.x509.name import Name, _ASN1Type
from cryptography.x509.oid import ObjectIdentifier
_EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
# This must be kept in sync with sign.rs's list of allowable types in
# identify_hash_type
_AllowedHashTypes = typing.Union[
hashes.SHA224,
hashes.SHA256,
hashes.SHA384,
hashes.SHA512,
hashes.SHA3_224,
hashes.SHA3_256,
hashes.SHA3_384,
hashes.SHA3_512,
]
class AttributeNotFound(Exception):
def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
super().__init__(msg)
self.oid = oid
def _reject_duplicate_extension(
extension: Extension[ExtensionType],
extensions: list[Extension[ExtensionType]],
) -> None:
# This is quadratic in the number of extensions
for e in extensions:
if e.oid == extension.oid:
raise ValueError("This extension has already been set.")
def _reject_duplicate_attribute(
oid: ObjectIdentifier,
attributes: list[tuple[ObjectIdentifier, bytes, int | None]],
) -> None:
# This is quadratic in the number of attributes
for attr_oid, _, _ in attributes:
if attr_oid == oid:
raise ValueError("This attribute has already been set.")
def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
"""Normalizes a datetime to a naive datetime in UTC.
time -- datetime to normalize. Assumed to be in UTC if not timezone
aware.
"""
if time.tzinfo is not None:
offset = time.utcoffset()
offset = offset if offset else datetime.timedelta()
return time.replace(tzinfo=None) - offset
else:
return time
class Attribute:
def __init__(
self,
oid: ObjectIdentifier,
value: bytes,
_type: int = _ASN1Type.UTF8String.value,
) -> None:
self._oid = oid
self._value = value
self._type = _type
@property
def oid(self) -> ObjectIdentifier:
return self._oid
@property
def value(self) -> bytes:
return self._value
def __repr__(self) -> str:
return f"<Attribute(oid={self.oid}, value={self.value!r})>"
def __eq__(self, other: object) -> bool:
if not isinstance(other, Attribute):
return NotImplemented
return (
self.oid == other.oid
and self.value == other.value
and self._type == other._type
)
def __hash__(self) -> int:
return hash((self.oid, self.value, self._type))
class Attributes:
def __init__(
self,
attributes: typing.Iterable[Attribute],
) -> None:
self._attributes = list(attributes)
__len__, __iter__, __getitem__ = _make_sequence_methods("_attributes")
def __repr__(self) -> str:
return f"<Attributes({self._attributes})>"
def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute:
for attr in self:
if attr.oid == oid:
return attr
raise AttributeNotFound(f"No {oid} attribute was found", oid)
class Version(utils.Enum):
v1 = 0
v3 = 2
class InvalidVersion(Exception):
def __init__(self, msg: str, parsed_version: int) -> None:
super().__init__(msg)
self.parsed_version = parsed_version
class Certificate(metaclass=abc.ABCMeta):
@abc.abstractmethod
def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
"""
Returns bytes using digest passed.
"""
@property
@abc.abstractmethod
def serial_number(self) -> int:
"""
Returns certificate serial number
"""
@property
@abc.abstractmethod
def version(self) -> Version:
"""
Returns the certificate version
"""
@abc.abstractmethod
def public_key(self) -> CertificatePublicKeyTypes:
"""
Returns the public key
"""
@property
@abc.abstractmethod
def public_key_algorithm_oid(self) -> ObjectIdentifier:
"""
Returns the ObjectIdentifier of the public key.
"""
@property
@abc.abstractmethod
def not_valid_before(self) -> datetime.datetime:
"""
Not before time (represented as UTC datetime)
"""
@property
@abc.abstractmethod
def not_valid_before_utc(self) -> datetime.datetime:
"""
Not before time (represented as a non-naive UTC datetime)
"""
@property
@abc.abstractmethod
def not_valid_after(self) -> datetime.datetime:
"""
Not after time (represented as UTC datetime)
"""
@property
@abc.abstractmethod
def not_valid_after_utc(self) -> datetime.datetime:
"""
Not after time (represented as a non-naive UTC datetime)
"""
@property
@abc.abstractmethod
def issuer(self) -> Name:
"""
Returns the issuer name object.
"""
@property
@abc.abstractmethod
def subject(self) -> Name:
"""
Returns the subject name object.
"""
@property
@abc.abstractmethod
def signature_hash_algorithm(
self,
) -> hashes.HashAlgorithm | None:
"""
Returns a HashAlgorithm corresponding to the type of the digest signed
in the certificate.
"""
@property
@abc.abstractmethod
def signature_algorithm_oid(self) -> ObjectIdentifier:
"""
Returns the ObjectIdentifier of the signature algorithm.
"""
@property
@abc.abstractmethod
def signature_algorithm_parameters(
self,
) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA:
"""
Returns the signature algorithm parameters.
"""
@property
@abc.abstractmethod
def extensions(self) -> Extensions:
"""
Returns an Extensions object.
"""
@property
@abc.abstractmethod
def signature(self) -> bytes:
"""
Returns the signature bytes.
"""
@property
@abc.abstractmethod
def tbs_certificate_bytes(self) -> bytes:
"""
Returns the tbsCertificate payload bytes as defined in RFC 5280.
"""
@property
@abc.abstractmethod
def tbs_precertificate_bytes(self) -> bytes:
"""
Returns the tbsCertificate payload bytes with the SCT list extension
stripped.
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
@abc.abstractmethod
def __hash__(self) -> int:
"""
Computes a hash.
"""
@abc.abstractmethod
def public_bytes(self, encoding: serialization.Encoding) -> bytes:
"""
Serializes the certificate to PEM or DER format.
"""
@abc.abstractmethod
def verify_directly_issued_by(self, issuer: Certificate) -> None:
"""
This method verifies that certificate issuer name matches the
issuer subject name and that the certificate is signed by the
issuer's private key. No other validation is performed.
"""
# Runtime isinstance checks need this since the rust class is not a subclass.
Certificate.register(rust_x509.Certificate)
class RevokedCertificate(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def serial_number(self) -> int:
"""
Returns the serial number of the revoked certificate.
"""
@property
@abc.abstractmethod
def revocation_date(self) -> datetime.datetime:
"""
Returns the date of when this certificate was revoked.
"""
@property
@abc.abstractmethod
def revocation_date_utc(self) -> datetime.datetime:
"""
Returns the date of when this certificate was revoked as a non-naive
UTC datetime.
"""
@property
@abc.abstractmethod
def extensions(self) -> Extensions:
"""
Returns an Extensions object containing a list of Revoked extensions.
"""
# Runtime isinstance checks need this since the rust class is not a subclass.
RevokedCertificate.register(rust_x509.RevokedCertificate)
class _RawRevokedCertificate(RevokedCertificate):
def __init__(
self,
serial_number: int,
revocation_date: datetime.datetime,
extensions: Extensions,
):
self._serial_number = serial_number
self._revocation_date = revocation_date
self._extensions = extensions
@property
def serial_number(self) -> int:
return self._serial_number
@property
def revocation_date(self) -> datetime.datetime:
warnings.warn(
"Properties that return a naïve datetime object have been "
"deprecated. Please switch to revocation_date_utc.",
utils.DeprecatedIn42,
stacklevel=2,
)
return self._revocation_date
@property
def revocation_date_utc(self) -> datetime.datetime:
return self._revocation_date.replace(tzinfo=datetime.timezone.utc)
@property
def extensions(self) -> Extensions:
return self._extensions
class CertificateRevocationList(metaclass=abc.ABCMeta):
@abc.abstractmethod
def public_bytes(self, encoding: serialization.Encoding) -> bytes:
"""
Serializes the CRL to PEM or DER format.
"""
@abc.abstractmethod
def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
"""
Returns bytes using digest passed.
"""
@abc.abstractmethod
def get_revoked_certificate_by_serial_number(
self, serial_number: int
) -> RevokedCertificate | None:
"""
Returns an instance of RevokedCertificate or None if the serial_number
is not in the CRL.
"""
@property
@abc.abstractmethod
def signature_hash_algorithm(
self,
) -> hashes.HashAlgorithm | None:
"""
Returns a HashAlgorithm corresponding to the type of the digest signed
in the certificate.
"""
@property
@abc.abstractmethod
def signature_algorithm_oid(self) -> ObjectIdentifier:
"""
Returns the ObjectIdentifier of the signature algorithm.
"""
@property
@abc.abstractmethod
def signature_algorithm_parameters(
self,
) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA:
"""
Returns the signature algorithm parameters.
"""
@property
@abc.abstractmethod
def issuer(self) -> Name:
"""
Returns the X509Name with the issuer of this CRL.
"""
@property
@abc.abstractmethod
def next_update(self) -> datetime.datetime | None:
"""
Returns the date of next update for this CRL.
"""
@property
@abc.abstractmethod
def next_update_utc(self) -> datetime.datetime | None:
"""
Returns the date of next update for this CRL as a non-naive UTC
datetime.
"""
@property
@abc.abstractmethod
def last_update(self) -> datetime.datetime:
"""
Returns the date of last update for this CRL.
"""
@property
@abc.abstractmethod
def last_update_utc(self) -> datetime.datetime:
"""
Returns the date of last update for this CRL as a non-naive UTC
datetime.
"""
@property
@abc.abstractmethod
def extensions(self) -> Extensions:
"""
Returns an Extensions object containing a list of CRL extensions.
"""
@property
@abc.abstractmethod
def signature(self) -> bytes:
"""
Returns the signature bytes.
"""
@property
@abc.abstractmethod
def tbs_certlist_bytes(self) -> bytes:
"""
Returns the tbsCertList payload bytes as defined in RFC 5280.
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
@abc.abstractmethod
def __len__(self) -> int:
"""
Number of revoked certificates in the CRL.
"""
@typing.overload
def __getitem__(self, idx: int) -> RevokedCertificate: ...
@typing.overload
def __getitem__(self, idx: slice) -> list[RevokedCertificate]: ...
@abc.abstractmethod
def __getitem__(
self, idx: int | slice
) -> RevokedCertificate | list[RevokedCertificate]:
"""
Returns a revoked certificate (or slice of revoked certificates).
"""
@abc.abstractmethod
def __iter__(self) -> typing.Iterator[RevokedCertificate]:
"""
Iterator over the revoked certificates
"""
@abc.abstractmethod
def is_signature_valid(
self, public_key: CertificateIssuerPublicKeyTypes
) -> bool:
"""
Verifies signature of revocation list against given public key.
"""
CertificateRevocationList.register(rust_x509.CertificateRevocationList)
class CertificateSigningRequest(metaclass=abc.ABCMeta):
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
@abc.abstractmethod
def __hash__(self) -> int:
"""
Computes a hash.
"""
@abc.abstractmethod
def public_key(self) -> CertificatePublicKeyTypes:
"""
Returns the public key
"""
@property
@abc.abstractmethod
def subject(self) -> Name:
"""
Returns the subject name object.
"""
@property
@abc.abstractmethod
def signature_hash_algorithm(
self,
) -> hashes.HashAlgorithm | None:
"""
Returns a HashAlgorithm corresponding to the type of the digest signed
in the certificate.
"""
@property
@abc.abstractmethod
def signature_algorithm_oid(self) -> ObjectIdentifier:
"""
Returns the ObjectIdentifier of the signature algorithm.
"""
@property
@abc.abstractmethod
def signature_algorithm_parameters(
self,
) -> None | padding.PSS | padding.PKCS1v15 | ec.ECDSA:
"""
Returns the signature algorithm parameters.
"""
@property
@abc.abstractmethod
def extensions(self) -> Extensions:
"""
Returns the extensions in the signing request.
"""
@property
@abc.abstractmethod
def attributes(self) -> Attributes:
"""
Returns an Attributes object.
"""
@abc.abstractmethod
def public_bytes(self, encoding: serialization.Encoding) -> bytes:
"""
Encodes the request to PEM or DER format.
"""
@property
@abc.abstractmethod
def signature(self) -> bytes:
"""
Returns the signature bytes.
"""
@property
@abc.abstractmethod
def tbs_certrequest_bytes(self) -> bytes:
"""
Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC
2986.
"""
@property
@abc.abstractmethod
def is_signature_valid(self) -> bool:
"""
Verifies signature of signing request.
"""
@abc.abstractmethod
def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes:
"""
Get the attribute value for a given OID.
"""
# Runtime isinstance checks need this since the rust class is not a subclass.
CertificateSigningRequest.register(rust_x509.CertificateSigningRequest)
load_pem_x509_certificate = rust_x509.load_pem_x509_certificate
load_der_x509_certificate = rust_x509.load_der_x509_certificate
load_pem_x509_certificates = rust_x509.load_pem_x509_certificates
load_pem_x509_csr = rust_x509.load_pem_x509_csr
load_der_x509_csr = rust_x509.load_der_x509_csr
load_pem_x509_crl = rust_x509.load_pem_x509_crl
load_der_x509_crl = rust_x509.load_der_x509_crl
class CertificateSigningRequestBuilder:
def __init__(
self,
subject_name: Name | None = None,
extensions: list[Extension[ExtensionType]] = [],
attributes: list[tuple[ObjectIdentifier, bytes, int | None]] = [],
):
"""
Creates an empty X.509 certificate request (v1).
"""
self._subject_name = subject_name
self._extensions = extensions
self._attributes = attributes
def subject_name(self, name: Name) -> CertificateSigningRequestBuilder:
"""
Sets the certificate requestor's distinguished name.
"""
if not isinstance(name, Name):
raise TypeError("Expecting x509.Name object.")
if self._subject_name is not None:
raise ValueError("The subject name may only be set once.")
return CertificateSigningRequestBuilder(
name, self._extensions, self._attributes
)
def add_extension(
self, extval: ExtensionType, critical: bool
) -> CertificateSigningRequestBuilder:
"""
Adds an X.509 extension to the certificate request.
"""
if not isinstance(extval, ExtensionType):
raise TypeError("extension must be an ExtensionType")
extension = Extension(extval.oid, critical, extval)
_reject_duplicate_extension(extension, self._extensions)
return CertificateSigningRequestBuilder(
self._subject_name,
[*self._extensions, extension],
self._attributes,
)
def add_attribute(
self,
oid: ObjectIdentifier,
value: bytes,
*,
_tag: _ASN1Type | None = None,
) -> CertificateSigningRequestBuilder:
"""
Adds an X.509 attribute with an OID and associated value.
"""
if not isinstance(oid, ObjectIdentifier):
raise TypeError("oid must be an ObjectIdentifier")
if not isinstance(value, bytes):
raise TypeError("value must be bytes")
if _tag is not None and not isinstance(_tag, _ASN1Type):
raise TypeError("tag must be _ASN1Type")
_reject_duplicate_attribute(oid, self._attributes)
if _tag is not None:
tag = _tag.value
else:
tag = None
return CertificateSigningRequestBuilder(
self._subject_name,
self._extensions,
[*self._attributes, (oid, value, tag)],
)
def sign(
self,
private_key: CertificateIssuerPrivateKeyTypes,
algorithm: _AllowedHashTypes | None,
backend: typing.Any = None,
*,
rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
) -> CertificateSigningRequest:
"""
Signs the request using the requestor's private key.
"""
if self._subject_name is None:
raise ValueError("A CertificateSigningRequest must have a subject")
if rsa_padding is not None:
if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
raise TypeError("Padding must be PSS or PKCS1v15")
if not isinstance(private_key, rsa.RSAPrivateKey):
raise TypeError("Padding is only supported for RSA keys")
return rust_x509.create_x509_csr(
self, private_key, algorithm, rsa_padding
)
class CertificateBuilder:
_extensions: list[Extension[ExtensionType]]
def __init__(
self,
issuer_name: Name | None = None,
subject_name: Name | None = None,
public_key: CertificatePublicKeyTypes | None = None,
serial_number: int | None = None,
not_valid_before: datetime.datetime | None = None,
not_valid_after: datetime.datetime | None = None,
extensions: list[Extension[ExtensionType]] = [],
) -> None:
self._version = Version.v3
self._issuer_name = issuer_name
self._subject_name = subject_name
self._public_key = public_key
self._serial_number = serial_number
self._not_valid_before = not_valid_before
self._not_valid_after = not_valid_after
self._extensions = extensions
def issuer_name(self, name: Name) -> CertificateBuilder:
"""
Sets the CA's distinguished name.
"""
if not isinstance(name, Name):
raise TypeError("Expecting x509.Name object.")
if self._issuer_name is not None:
raise ValueError("The issuer name may only be set once.")
return CertificateBuilder(
name,
self._subject_name,
self._public_key,
self._serial_number,
self._not_valid_before,
self._not_valid_after,
self._extensions,
)
def subject_name(self, name: Name) -> CertificateBuilder:
"""
Sets the requestor's distinguished name.
"""
if not isinstance(name, Name):
raise TypeError("Expecting x509.Name object.")
if self._subject_name is not None:
raise ValueError("The subject name may only be set once.")
return CertificateBuilder(
self._issuer_name,
name,
self._public_key,
self._serial_number,
self._not_valid_before,
self._not_valid_after,
self._extensions,
)
def public_key(
self,
key: CertificatePublicKeyTypes,
) -> CertificateBuilder:
"""
Sets the requestor's public key (as found in the signing request).
"""
if not isinstance(
key,
(
dsa.DSAPublicKey,
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
ed25519.Ed25519PublicKey,
ed448.Ed448PublicKey,
x25519.X25519PublicKey,
x448.X448PublicKey,
),
):
raise TypeError(
"Expecting one of DSAPublicKey, RSAPublicKey,"
" EllipticCurvePublicKey, Ed25519PublicKey,"
" Ed448PublicKey, X25519PublicKey, or "
"X448PublicKey."
)
if self._public_key is not None:
raise ValueError("The public key may only be set once.")
return CertificateBuilder(
self._issuer_name,
self._subject_name,
key,
self._serial_number,
self._not_valid_before,
self._not_valid_after,
self._extensions,
)
def serial_number(self, number: int) -> CertificateBuilder:
"""
Sets the certificate serial number.
"""
if not isinstance(number, int):
raise TypeError("Serial number must be of integral type.")
if self._serial_number is not None:
raise ValueError("The serial number may only be set once.")
if number <= 0:
raise ValueError("The serial number should be positive.")
# ASN.1 integers are always signed, so most significant bit must be
# zero.
if number.bit_length() >= 160: # As defined in RFC 5280
raise ValueError(
"The serial number should not be more than 159 bits."
)
return CertificateBuilder(
self._issuer_name,
self._subject_name,
self._public_key,
number,
self._not_valid_before,
self._not_valid_after,
self._extensions,
)
def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder:
"""
Sets the certificate activation time.
"""
if not isinstance(time, datetime.datetime):
raise TypeError("Expecting datetime object.")
if self._not_valid_before is not None:
raise ValueError("The not valid before may only be set once.")
time = _convert_to_naive_utc_time(time)
if time < _EARLIEST_UTC_TIME:
raise ValueError(
"The not valid before date must be on or after"
" 1950 January 1)."
)
if self._not_valid_after is not None and time > self._not_valid_after:
raise ValueError(
"The not valid before date must be before the not valid after "
"date."
)
return CertificateBuilder(
self._issuer_name,
self._subject_name,
self._public_key,
self._serial_number,
time,
self._not_valid_after,
self._extensions,
)
def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder:
"""
Sets the certificate expiration time.
"""
if not isinstance(time, datetime.datetime):
raise TypeError("Expecting datetime object.")
if self._not_valid_after is not None:
raise ValueError("The not valid after may only be set once.")
time = _convert_to_naive_utc_time(time)
if time < _EARLIEST_UTC_TIME:
raise ValueError(
"The not valid after date must be on or after"
" 1950 January 1."
)
if (
self._not_valid_before is not None
and time < self._not_valid_before
):
raise ValueError(
"The not valid after date must be after the not valid before "
"date."
)
return CertificateBuilder(
self._issuer_name,
self._subject_name,
self._public_key,
self._serial_number,
self._not_valid_before,
time,
self._extensions,
)
def add_extension(
self, extval: ExtensionType, critical: bool
) -> CertificateBuilder:
"""
Adds an X.509 extension to the certificate.
"""
if not isinstance(extval, ExtensionType):
raise TypeError("extension must be an ExtensionType")
extension = Extension(extval.oid, critical, extval)
_reject_duplicate_extension(extension, self._extensions)
return CertificateBuilder(
self._issuer_name,
self._subject_name,
self._public_key,
self._serial_number,
self._not_valid_before,
self._not_valid_after,
[*self._extensions, extension],
)
def sign(
self,
private_key: CertificateIssuerPrivateKeyTypes,
algorithm: _AllowedHashTypes | None,
backend: typing.Any = None,
*,
rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
) -> Certificate:
"""
Signs the certificate using the CA's private key.
"""
if self._subject_name is None:
raise ValueError("A certificate must have a subject name")
if self._issuer_name is None:
raise ValueError("A certificate must have an issuer name")
if self._serial_number is None:
raise ValueError("A certificate must have a serial number")
if self._not_valid_before is None:
raise ValueError("A certificate must have a not valid before time")
if self._not_valid_after is None:
raise ValueError("A certificate must have a not valid after time")
if self._public_key is None:
raise ValueError("A certificate must have a public key")
if rsa_padding is not None:
if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
raise TypeError("Padding must be PSS or PKCS1v15")
if not isinstance(private_key, rsa.RSAPrivateKey):
raise TypeError("Padding is only supported for RSA keys")
return rust_x509.create_x509_certificate(
self, private_key, algorithm, rsa_padding
)
class CertificateRevocationListBuilder:
_extensions: list[Extension[ExtensionType]]
_revoked_certificates: list[RevokedCertificate]
def __init__(
self,
issuer_name: Name | None = None,
last_update: datetime.datetime | None = None,
next_update: datetime.datetime | None = None,
extensions: list[Extension[ExtensionType]] = [],
revoked_certificates: list[RevokedCertificate] = [],
):
self._issuer_name = issuer_name
self._last_update = last_update
self._next_update = next_update
self._extensions = extensions
self._revoked_certificates = revoked_certificates
def issuer_name(
self, issuer_name: Name
) -> CertificateRevocationListBuilder:
if not isinstance(issuer_name, Name):
raise TypeError("Expecting x509.Name object.")
if self._issuer_name is not None:
raise ValueError("The issuer name may only be set once.")
return CertificateRevocationListBuilder(
issuer_name,
self._last_update,
self._next_update,
self._extensions,
self._revoked_certificates,
)
def last_update(
self, last_update: datetime.datetime
) -> CertificateRevocationListBuilder:
if not isinstance(last_update, datetime.datetime):
raise TypeError("Expecting datetime object.")
if self._last_update is not None:
raise ValueError("Last update may only be set once.")
last_update = _convert_to_naive_utc_time(last_update)
if last_update < _EARLIEST_UTC_TIME:
raise ValueError(
"The last update date must be on or after 1950 January 1."
)
if self._next_update is not None and last_update > self._next_update:
raise ValueError(
"The last update date must be before the next update date."
)
return CertificateRevocationListBuilder(
self._issuer_name,
last_update,
self._next_update,
self._extensions,
self._revoked_certificates,
)
def next_update(
self, next_update: datetime.datetime
) -> CertificateRevocationListBuilder:
if not isinstance(next_update, datetime.datetime):
raise TypeError("Expecting datetime object.")
if self._next_update is not None:
raise ValueError("Last update may only be set once.")
next_update = _convert_to_naive_utc_time(next_update)
if next_update < _EARLIEST_UTC_TIME:
raise ValueError(
"The last update date must be on or after 1950 January 1."
)
if self._last_update is not None and next_update < self._last_update:
raise ValueError(
"The next update date must be after the last update date."
)
return CertificateRevocationListBuilder(
self._issuer_name,
self._last_update,
next_update,
self._extensions,
self._revoked_certificates,
)
def add_extension(
self, extval: ExtensionType, critical: bool
) -> CertificateRevocationListBuilder:
"""
Adds an X.509 extension to the certificate revocation list.
"""
if not isinstance(extval, ExtensionType):
raise TypeError("extension must be an ExtensionType")
extension = Extension(extval.oid, critical, extval)
_reject_duplicate_extension(extension, self._extensions)
return CertificateRevocationListBuilder(
self._issuer_name,
self._last_update,
self._next_update,
[*self._extensions, extension],
self._revoked_certificates,
)
def add_revoked_certificate(
self, revoked_certificate: RevokedCertificate
) -> CertificateRevocationListBuilder:
"""
Adds a revoked certificate to the CRL.
"""
if not isinstance(revoked_certificate, RevokedCertificate):
raise TypeError("Must be an instance of RevokedCertificate")
return CertificateRevocationListBuilder(
self._issuer_name,
self._last_update,
self._next_update,
self._extensions,
[*self._revoked_certificates, revoked_certificate],
)
def sign(
self,
private_key: CertificateIssuerPrivateKeyTypes,
algorithm: _AllowedHashTypes | None,
backend: typing.Any = None,
*,
rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
) -> CertificateRevocationList:
if self._issuer_name is None:
raise ValueError("A CRL must have an issuer name")
if self._last_update is None:
raise ValueError("A CRL must have a last update time")
if self._next_update is None:
raise ValueError("A CRL must have a next update time")
if rsa_padding is not None:
if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
raise TypeError("Padding must be PSS or PKCS1v15")
if not isinstance(private_key, rsa.RSAPrivateKey):
raise TypeError("Padding is only supported for RSA keys")
return rust_x509.create_x509_crl(
self, private_key, algorithm, rsa_padding
)
class RevokedCertificateBuilder:
def __init__(
self,
serial_number: int | None = None,
revocation_date: datetime.datetime | None = None,
extensions: list[Extension[ExtensionType]] = [],
):
self._serial_number = serial_number
self._revocation_date = revocation_date
self._extensions = extensions
def serial_number(self, number: int) -> RevokedCertificateBuilder:
if not isinstance(number, int):
raise TypeError("Serial number must be of integral type.")
if self._serial_number is not None:
raise ValueError("The serial number may only be set once.")
if number <= 0:
raise ValueError("The serial number should be positive")
# ASN.1 integers are always signed, so most significant bit must be
# zero.
if number.bit_length() >= 160: # As defined in RFC 5280
raise ValueError(
"The serial number should not be more than 159 bits."
)
return RevokedCertificateBuilder(
number, self._revocation_date, self._extensions
)
def revocation_date(
self, time: datetime.datetime
) -> RevokedCertificateBuilder:
if not isinstance(time, datetime.datetime):
raise TypeError("Expecting datetime object.")
if self._revocation_date is not None:
raise ValueError("The revocation date may only be set once.")
time = _convert_to_naive_utc_time(time)
if time < _EARLIEST_UTC_TIME:
raise ValueError(
"The revocation date must be on or after 1950 January 1."
)
return RevokedCertificateBuilder(
self._serial_number, time, self._extensions
)
def add_extension(
self, extval: ExtensionType, critical: bool
) -> RevokedCertificateBuilder:
if not isinstance(extval, ExtensionType):
raise TypeError("extension must be an ExtensionType")
extension = Extension(extval.oid, critical, extval)
_reject_duplicate_extension(extension, self._extensions)
return RevokedCertificateBuilder(
self._serial_number,
self._revocation_date,
[*self._extensions, extension],
)
def build(self, backend: typing.Any = None) -> RevokedCertificate:
if self._serial_number is None:
raise ValueError("A revoked certificate must have a serial number")
if self._revocation_date is None:
raise ValueError(
"A revoked certificate must have a revocation date"
)
return _RawRevokedCertificate(
self._serial_number,
self._revocation_date,
Extensions(self._extensions),
)
def random_serial_number() -> int:
return int.from_bytes(os.urandom(20), "big") >> 1