represent SubjectAltNames as cryptography.x509.GeneralName objects, not strings. (#6537)

This fixes #6536
This commit is contained in:
Maximilian Hils 2023-12-14 10:26:23 +01:00 committed by GitHub
parent c2f1aa1600
commit b659ea0101
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 146 additions and 71 deletions

View File

@ -34,6 +34,9 @@
([#6492](https://github.com/mitmproxy/mitmproxy/pull/6492), @xBZZZZ)
* Enhance documentation and add alert log messages when stream_large_bodies and modify_body are set
([#6514](https://github.com/mitmproxy/mitmproxy/pull/6514), @rosydawn6)
* Subject Alternative Names are now represented as `cryptography.x509.GeneralNames` instead of `list[str]`
across the codebase. This fixes a regression introduced in mitmproxy 10.1.1 related to punycode domain encoding.
([#6537](https://github.com/mitmproxy/mitmproxy/pull/6537), @mhils)
## 14 November 2023: mitmproxy 10.1.5

View File

@ -8,6 +8,7 @@ from typing import TypedDict
from aioquic.h3.connection import H3_ALPN
from aioquic.tls import CipherSuite
from cryptography import x509
from OpenSSL import crypto
from OpenSSL import SSL
@ -488,31 +489,42 @@ class TlsConfig:
This function determines the Common Name (CN), Subject Alternative Names (SANs) and Organization Name
our certificate should have and then fetches a matching cert from the certstore.
"""
altnames: list[str] = []
altnames: list[x509.GeneralName] = []
organization: str | None = None
# Use upstream certificate if available.
if ctx.options.upstream_cert and conn_context.server.certificate_list:
upstream_cert = conn_context.server.certificate_list[0]
if upstream_cert.cn:
altnames.append(upstream_cert.cn)
altnames.append(_ip_or_dns_name(upstream_cert.cn))
altnames.extend(upstream_cert.altnames)
if upstream_cert.organization:
organization = upstream_cert.organization
# Add SNI or our local IP address.
if conn_context.client.sni:
altnames.append(conn_context.client.sni)
altnames.append(_ip_or_dns_name(conn_context.client.sni))
else:
altnames.append(conn_context.client.sockname[0])
altnames.append(_ip_or_dns_name(conn_context.client.sockname[0]))
# If we already know of a server address, include that in the SANs as well.
if conn_context.server.address:
altnames.append(conn_context.server.address[0])
altnames.append(_ip_or_dns_name(conn_context.server.address[0]))
# only keep first occurrence of each hostname
altnames = list(dict.fromkeys(altnames))
# RFC 2818: If a subjectAltName extension of type dNSName is present, that MUST be used as the identity.
# In other words, the Common Name is irrelevant then.
return self.certstore.get_cert(altnames[0], altnames, organization)
cn = next((str(x.value) for x in altnames), None)
return self.certstore.get_cert(cn, altnames, organization)
def _ip_or_dns_name(val: str) -> x509.GeneralName:
"""Convert a string into either an x509.IPAddress or x509.DNSName object."""
try:
ip = ipaddress.ip_address(val)
except ValueError:
return x509.DNSName(val)
else:
return x509.IPAddress(ip)

View File

@ -3,6 +3,8 @@ import datetime
import ipaddress
import os
import sys
import warnings
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import cast
@ -58,7 +60,8 @@ class Cert(serializable.Serializable):
return self.fingerprint() == other.fingerprint()
def __repr__(self):
return f"<Cert(cn={self.cn!r}, altnames={self.altnames!r})>"
altnames = [str(x.value) for x in self.altnames]
return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
def __hash__(self):
return self._cert.__hash__()
@ -147,20 +150,18 @@ class Cert(serializable.Serializable):
return None
@property
def altnames(self) -> list[str]:
def altnames(self) -> x509.GeneralNames:
"""
Get all SubjectAlternativeName DNS altnames.
"""
try:
ext = self._cert.extensions.get_extension_for_class(
sans = self._cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
).value
except x509.ExtensionNotFound:
return []
return x509.GeneralNames([])
else:
return ext.get_values_for_type(x509.DNSName) + [
str(x) for x in ext.get_values_for_type(x509.IPAddress)
]
return x509.GeneralNames(sans)
def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]:
@ -224,11 +225,41 @@ def create_ca(
return private_key, cert
def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames:
"""
SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames.
This function converts the old format to the new one.
"""
if isinstance(sans, x509.GeneralNames):
return sans
elif (
isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str)
): # pragma: no cover
warnings.warn(
"Passing SANs as a list of strings is deprecated.",
DeprecationWarning,
stacklevel=2,
)
ss: list[x509.GeneralName] = []
for x in cast(list[str], sans):
try:
ip = ipaddress.ip_address(x)
except ValueError:
x = x.encode("idna").decode()
ss.append(x509.DNSName(x))
else:
ss.append(x509.IPAddress(ip))
return x509.GeneralNames(ss)
else:
return x509.GeneralNames(sans)
def dummy_cert(
privkey: rsa.RSAPrivateKey,
cacert: x509.Certificate,
commonname: str | None,
sans: list[str],
sans: Iterable[x509.GeneralName],
organization: str | None = None,
) -> Cert:
"""
@ -264,18 +295,10 @@ def dummy_cert(
builder = builder.subject_name(x509.Name(subject))
builder = builder.serial_number(x509.random_serial_number())
ss: list[x509.GeneralName] = []
for x in sans:
try:
ip = ipaddress.ip_address(x)
except ValueError:
x = x.encode("idna").decode()
ss.append(x509.DNSName(x))
else:
ss.append(x509.IPAddress(ip))
# RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty.
builder = builder.add_extension(
x509.SubjectAlternativeName(ss), critical=not is_valid_commonname
x509.SubjectAlternativeName(_fix_legacy_sans(sans)),
critical=not is_valid_commonname,
)
# https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1
@ -301,7 +324,7 @@ class CertStoreEntry:
TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs)
TGeneratedCertId = tuple[Optional[str], tuple[str, ...]] # (common_name, sans)
TGeneratedCertId = tuple[Optional[str], x509.GeneralNames] # (common_name, sans)
TCertId = Union[TCustomCertId, TGeneratedCertId]
DHParams = NewType("DHParams", bytes)
@ -485,26 +508,31 @@ class CertStore:
if entry.cert.cn:
self.certs[entry.cert.cn] = entry
for i in entry.cert.altnames:
self.certs[i] = entry
self.certs[str(i.value)] = entry
for i in names:
self.certs[i] = entry
@staticmethod
def asterisk_forms(dn: str) -> list[str]:
def asterisk_forms(dn: str | x509.GeneralName) -> list[str]:
"""
Return all asterisk forms for a domain. For example, for www.example.com this will return
[b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted.
"""
parts = dn.split(".")
ret = [dn]
for i in range(1, len(parts)):
ret.append("*." + ".".join(parts[i:]))
return ret
if isinstance(dn, str):
parts = dn.split(".")
ret = [dn]
for i in range(1, len(parts)):
ret.append("*." + ".".join(parts[i:]))
return ret
elif isinstance(dn, x509.DNSName):
return CertStore.asterisk_forms(dn.value)
else:
return [str(dn.value)]
def get_cert(
self,
commonname: str | None,
sans: list[str],
sans: Iterable[x509.GeneralName],
organization: str | None = None,
) -> CertStoreEntry:
"""
@ -515,6 +543,7 @@ class CertStore:
organization: Organization name for the generated certificate.
"""
sans = _fix_legacy_sans(sans)
potential_keys: list[TCertId] = []
if commonname:
@ -522,7 +551,7 @@ class CertStore:
for s in sans:
potential_keys.extend(self.asterisk_forms(s))
potential_keys.append("*")
potential_keys.append((commonname, tuple(sans)))
potential_keys.append((commonname, sans))
name = next(filter(lambda key: key in self.certs, potential_keys), None)
if name:
@ -540,7 +569,7 @@ class CertStore:
chain_file=self.default_chain_file,
chain_certs=self.default_chain_certs,
)
self.certs[(commonname, tuple(sans))] = entry
self.certs[(commonname, sans)] = entry
self.expire(entry)
return entry

View File

@ -80,7 +80,7 @@ def flowdetails(state, flow: mitmproxy.flow.Flow):
]
if c.altnames:
parts.append(("Alt names", ", ".join(c.altnames)))
parts.append(("Alt names", ", ".join(str(x.value) for x in c.altnames)))
text.extend(common.format_keyvals(parts, indent=4))
if cc is not None:

View File

@ -50,7 +50,7 @@ def cert_to_json(certs: Sequence[certs.Cert]) -> dict | None:
"serial": str(cert.serial),
"subject": cert.subject,
"issuer": cert.issuer,
"altnames": cert.altnames,
"altnames": [str(x.value) for x in cert.altnames],
}

View File

@ -1,3 +1,4 @@
import ipaddress
import ssl
import time
from pathlib import Path
@ -128,20 +129,24 @@ class TestTlsConfig:
ctx.server.certificate_list = [certs.Cert.from_pem(f.read())]
entry = ta.get_cert(ctx)
assert entry.cert.cn == "example.mitmproxy.org"
assert entry.cert.altnames == [
"example.mitmproxy.org",
"server-address.example",
"127.0.0.1",
]
assert entry.cert.altnames == x509.GeneralNames(
[
x509.DNSName("example.mitmproxy.org"),
x509.IPAddress(ipaddress.ip_address("127.0.0.1")),
x509.DNSName("server-address.example"),
]
)
# And now we also incorporate SNI.
ctx.client.sni = "sni.example"
entry = ta.get_cert(ctx)
assert entry.cert.altnames == [
"example.mitmproxy.org",
"sni.example",
"server-address.example",
]
assert entry.cert.altnames == x509.GeneralNames(
[
x509.DNSName("example.mitmproxy.org"),
x509.DNSName("sni.example"),
x509.DNSName("server-address.example"),
]
)
with open(tdata.path("mitmproxy/data/invalid-subject.pem"), "rb") as f:
ctx.server.certificate_list = [certs.Cert.from_pem(f.read())]

View File

@ -1,3 +1,4 @@
import ipaddress
import os
from datetime import datetime
from datetime import timezone
@ -73,16 +74,16 @@ class TestCertStore:
assert len(ca.default_chain_certs) == 2
def test_sans(self, tstore):
c1 = tstore.get_cert("foo.com", ["*.bar.com"])
c1 = tstore.get_cert("foo.com", [x509.DNSName("*.bar.com")])
tstore.get_cert("foo.bar.com", [])
# assert c1 == c2
c3 = tstore.get_cert("bar.com", [])
assert not c1 == c3
def test_sans_change(self, tstore):
tstore.get_cert("foo.com", ["*.bar.com"])
entry = tstore.get_cert("foo.bar.com", ["*.baz.com"])
assert "*.baz.com" in entry.cert.altnames
tstore.get_cert("foo.com", [x509.DNSName("*.bar.com")])
entry = tstore.get_cert("foo.bar.com", [x509.DNSName("*.baz.com")])
assert x509.DNSName("*.baz.com") in entry.cert.altnames
def test_expire(self, tstore):
tstore.STORE_CAP = 3
@ -90,29 +91,29 @@ class TestCertStore:
tstore.get_cert("two.com", [])
tstore.get_cert("three.com", [])
assert ("one.com", ()) in tstore.certs
assert ("two.com", ()) in tstore.certs
assert ("three.com", ()) in tstore.certs
assert ("one.com", x509.GeneralNames([])) in tstore.certs
assert ("two.com", x509.GeneralNames([])) in tstore.certs
assert ("three.com", x509.GeneralNames([])) in tstore.certs
tstore.get_cert("one.com", [])
assert ("one.com", ()) in tstore.certs
assert ("two.com", ()) in tstore.certs
assert ("three.com", ()) in tstore.certs
assert ("one.com", x509.GeneralNames([])) in tstore.certs
assert ("two.com", x509.GeneralNames([])) in tstore.certs
assert ("three.com", x509.GeneralNames([])) in tstore.certs
tstore.get_cert("four.com", [])
assert ("one.com", ()) not in tstore.certs
assert ("two.com", ()) in tstore.certs
assert ("three.com", ()) in tstore.certs
assert ("four.com", ()) in tstore.certs
assert ("one.com", x509.GeneralNames([])) not in tstore.certs
assert ("two.com", x509.GeneralNames([])) in tstore.certs
assert ("three.com", x509.GeneralNames([])) in tstore.certs
assert ("four.com", x509.GeneralNames([])) in tstore.certs
def test_overrides(self, tmp_path):
ca1 = certs.CertStore.from_store(tmp_path / "ca1", "test", 2048)
ca2 = certs.CertStore.from_store(tmp_path / "ca2", "test", 2048)
assert not ca1.default_ca.serial == ca2.default_ca.serial
dc = ca2.get_cert("foo.com", ["sans.example.com"])
dc = ca2.get_cert("foo.com", [x509.DNSName("sans.example.com")])
dcp = tmp_path / "dc"
dcp.write_bytes(dc.cert.to_pem())
ca1.add_cert_file("foo.com", dcp)
@ -133,6 +134,23 @@ class TestCertStore:
# TODO: How do we actually attempt to read that file as another user?
assert os.stat(filename).st_mode & 0o77 == 0
@pytest.mark.parametrize(
"input,output",
[
(
"subdomain.example.com",
["subdomain.example.com", "*.example.com", "*.com"],
),
(
x509.DNSName("subdomain.example.com"),
["subdomain.example.com", "*.example.com", "*.com"],
),
(x509.IPAddress(ipaddress.ip_address("127.0.0.1")), ["127.0.0.1"]),
],
)
def test_asterisk_forms(self, input, output):
assert certs.CertStore.asterisk_forms(input) == output
class TestDummyCert:
def test_with_ca(self, tstore):
@ -140,17 +158,25 @@ class TestDummyCert:
tstore.default_privatekey,
tstore.default_ca._cert,
"foo.com",
["one.com", "two.com", "*.three.com", "127.0.0.1", "bücher.example"],
[
x509.DNSName("one.com"),
x509.DNSName("two.com"),
x509.DNSName("*.three.com"),
x509.IPAddress(ipaddress.ip_address("127.0.0.1")),
x509.DNSName("bücher.example".encode("idna").decode("ascii")),
],
"Foo Ltd.",
)
assert r.cn == "foo.com"
assert r.altnames == [
"one.com",
"two.com",
"*.three.com",
"xn--bcher-kva.example",
"127.0.0.1",
]
assert r.altnames == x509.GeneralNames(
[
x509.DNSName("one.com"),
x509.DNSName("two.com"),
x509.DNSName("*.three.com"),
x509.IPAddress(ipaddress.ip_address("127.0.0.1")),
x509.DNSName("xn--bcher-kva.example"),
]
)
assert r.organization == "Foo Ltd."
r = certs.dummy_cert(
@ -158,7 +184,7 @@ class TestDummyCert:
)
assert r.cn is None
assert r.organization is None
assert r.altnames == []
assert r.altnames == x509.GeneralNames([])
class TestCert: