From b659ea0101ed6af8dc5dc227d8e71aafcda12d75 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Thu, 14 Dec 2023 10:26:23 +0100 Subject: [PATCH] represent SubjectAltNames as cryptography.x509.GeneralName objects, not strings. (#6537) This fixes #6536 --- CHANGELOG.md | 3 + mitmproxy/addons/tlsconfig.py | 24 +++++-- mitmproxy/certs.py | 87 +++++++++++++++-------- mitmproxy/tools/console/flowdetailview.py | 2 +- mitmproxy/tools/web/app.py | 2 +- test/mitmproxy/addons/test_tlsconfig.py | 25 ++++--- test/mitmproxy/test_certs.py | 74 ++++++++++++------- 7 files changed, 146 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c8af83cb..bbf47091e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,9 @@ ([#6492](https://github.com/mitmproxy/mitmproxy/pull/6492), @xBZZZZ) * Enhance documentation and add alert log messages when stream_large_bodies and modify_body are set ([#6514](https://github.com/mitmproxy/mitmproxy/pull/6514), @rosydawn6) +* Subject Alternative Names are now represented as `cryptography.x509.GeneralNames` instead of `list[str]` + across the codebase. This fixes a regression introduced in mitmproxy 10.1.1 related to punycode domain encoding. + ([#6537](https://github.com/mitmproxy/mitmproxy/pull/6537), @mhils) ## 14 November 2023: mitmproxy 10.1.5 diff --git a/mitmproxy/addons/tlsconfig.py b/mitmproxy/addons/tlsconfig.py index e044d1d1c..63f876a0d 100644 --- a/mitmproxy/addons/tlsconfig.py +++ b/mitmproxy/addons/tlsconfig.py @@ -8,6 +8,7 @@ from typing import TypedDict from aioquic.h3.connection import H3_ALPN from aioquic.tls import CipherSuite +from cryptography import x509 from OpenSSL import crypto from OpenSSL import SSL @@ -488,31 +489,42 @@ class TlsConfig: This function determines the Common Name (CN), Subject Alternative Names (SANs) and Organization Name our certificate should have and then fetches a matching cert from the certstore. """ - altnames: list[str] = [] + altnames: list[x509.GeneralName] = [] organization: str | None = None # Use upstream certificate if available. if ctx.options.upstream_cert and conn_context.server.certificate_list: upstream_cert = conn_context.server.certificate_list[0] if upstream_cert.cn: - altnames.append(upstream_cert.cn) + altnames.append(_ip_or_dns_name(upstream_cert.cn)) altnames.extend(upstream_cert.altnames) if upstream_cert.organization: organization = upstream_cert.organization # Add SNI or our local IP address. if conn_context.client.sni: - altnames.append(conn_context.client.sni) + altnames.append(_ip_or_dns_name(conn_context.client.sni)) else: - altnames.append(conn_context.client.sockname[0]) + altnames.append(_ip_or_dns_name(conn_context.client.sockname[0])) # If we already know of a server address, include that in the SANs as well. if conn_context.server.address: - altnames.append(conn_context.server.address[0]) + altnames.append(_ip_or_dns_name(conn_context.server.address[0])) # only keep first occurrence of each hostname altnames = list(dict.fromkeys(altnames)) # RFC 2818: If a subjectAltName extension of type dNSName is present, that MUST be used as the identity. # In other words, the Common Name is irrelevant then. - return self.certstore.get_cert(altnames[0], altnames, organization) + cn = next((str(x.value) for x in altnames), None) + return self.certstore.get_cert(cn, altnames, organization) + + +def _ip_or_dns_name(val: str) -> x509.GeneralName: + """Convert a string into either an x509.IPAddress or x509.DNSName object.""" + try: + ip = ipaddress.ip_address(val) + except ValueError: + return x509.DNSName(val) + else: + return x509.IPAddress(ip) diff --git a/mitmproxy/certs.py b/mitmproxy/certs.py index c5bba3ebc..858b46de4 100644 --- a/mitmproxy/certs.py +++ b/mitmproxy/certs.py @@ -3,6 +3,8 @@ import datetime import ipaddress import os import sys +import warnings +from collections.abc import Iterable from dataclasses import dataclass from pathlib import Path from typing import cast @@ -58,7 +60,8 @@ class Cert(serializable.Serializable): return self.fingerprint() == other.fingerprint() def __repr__(self): - return f"" + altnames = [str(x.value) for x in self.altnames] + return f"" def __hash__(self): return self._cert.__hash__() @@ -147,20 +150,18 @@ class Cert(serializable.Serializable): return None @property - def altnames(self) -> list[str]: + def altnames(self) -> x509.GeneralNames: """ Get all SubjectAlternativeName DNS altnames. """ try: - ext = self._cert.extensions.get_extension_for_class( + sans = self._cert.extensions.get_extension_for_class( x509.SubjectAlternativeName ).value except x509.ExtensionNotFound: - return [] + return x509.GeneralNames([]) else: - return ext.get_values_for_type(x509.DNSName) + [ - str(x) for x in ext.get_values_for_type(x509.IPAddress) - ] + return x509.GeneralNames(sans) def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]: @@ -224,11 +225,41 @@ def create_ca( return private_key, cert +def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames: + """ + SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames. + This function converts the old format to the new one. + """ + if isinstance(sans, x509.GeneralNames): + return sans + elif ( + isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str) + ): # pragma: no cover + warnings.warn( + "Passing SANs as a list of strings is deprecated.", + DeprecationWarning, + stacklevel=2, + ) + + ss: list[x509.GeneralName] = [] + for x in cast(list[str], sans): + try: + ip = ipaddress.ip_address(x) + except ValueError: + x = x.encode("idna").decode() + ss.append(x509.DNSName(x)) + else: + ss.append(x509.IPAddress(ip)) + return x509.GeneralNames(ss) + else: + return x509.GeneralNames(sans) + + def dummy_cert( privkey: rsa.RSAPrivateKey, cacert: x509.Certificate, commonname: str | None, - sans: list[str], + sans: Iterable[x509.GeneralName], organization: str | None = None, ) -> Cert: """ @@ -264,18 +295,10 @@ def dummy_cert( builder = builder.subject_name(x509.Name(subject)) builder = builder.serial_number(x509.random_serial_number()) - ss: list[x509.GeneralName] = [] - for x in sans: - try: - ip = ipaddress.ip_address(x) - except ValueError: - x = x.encode("idna").decode() - ss.append(x509.DNSName(x)) - else: - ss.append(x509.IPAddress(ip)) # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty. builder = builder.add_extension( - x509.SubjectAlternativeName(ss), critical=not is_valid_commonname + x509.SubjectAlternativeName(_fix_legacy_sans(sans)), + critical=not is_valid_commonname, ) # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1 @@ -301,7 +324,7 @@ class CertStoreEntry: TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs) -TGeneratedCertId = tuple[Optional[str], tuple[str, ...]] # (common_name, sans) +TGeneratedCertId = tuple[Optional[str], x509.GeneralNames] # (common_name, sans) TCertId = Union[TCustomCertId, TGeneratedCertId] DHParams = NewType("DHParams", bytes) @@ -485,26 +508,31 @@ class CertStore: if entry.cert.cn: self.certs[entry.cert.cn] = entry for i in entry.cert.altnames: - self.certs[i] = entry + self.certs[str(i.value)] = entry for i in names: self.certs[i] = entry @staticmethod - def asterisk_forms(dn: str) -> list[str]: + def asterisk_forms(dn: str | x509.GeneralName) -> list[str]: """ Return all asterisk forms for a domain. For example, for www.example.com this will return [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted. """ - parts = dn.split(".") - ret = [dn] - for i in range(1, len(parts)): - ret.append("*." + ".".join(parts[i:])) - return ret + if isinstance(dn, str): + parts = dn.split(".") + ret = [dn] + for i in range(1, len(parts)): + ret.append("*." + ".".join(parts[i:])) + return ret + elif isinstance(dn, x509.DNSName): + return CertStore.asterisk_forms(dn.value) + else: + return [str(dn.value)] def get_cert( self, commonname: str | None, - sans: list[str], + sans: Iterable[x509.GeneralName], organization: str | None = None, ) -> CertStoreEntry: """ @@ -515,6 +543,7 @@ class CertStore: organization: Organization name for the generated certificate. """ + sans = _fix_legacy_sans(sans) potential_keys: list[TCertId] = [] if commonname: @@ -522,7 +551,7 @@ class CertStore: for s in sans: potential_keys.extend(self.asterisk_forms(s)) potential_keys.append("*") - potential_keys.append((commonname, tuple(sans))) + potential_keys.append((commonname, sans)) name = next(filter(lambda key: key in self.certs, potential_keys), None) if name: @@ -540,7 +569,7 @@ class CertStore: chain_file=self.default_chain_file, chain_certs=self.default_chain_certs, ) - self.certs[(commonname, tuple(sans))] = entry + self.certs[(commonname, sans)] = entry self.expire(entry) return entry diff --git a/mitmproxy/tools/console/flowdetailview.py b/mitmproxy/tools/console/flowdetailview.py index b8b274554..e3a28b91a 100644 --- a/mitmproxy/tools/console/flowdetailview.py +++ b/mitmproxy/tools/console/flowdetailview.py @@ -80,7 +80,7 @@ def flowdetails(state, flow: mitmproxy.flow.Flow): ] if c.altnames: - parts.append(("Alt names", ", ".join(c.altnames))) + parts.append(("Alt names", ", ".join(str(x.value) for x in c.altnames))) text.extend(common.format_keyvals(parts, indent=4)) if cc is not None: diff --git a/mitmproxy/tools/web/app.py b/mitmproxy/tools/web/app.py index f031d6787..db68b866a 100644 --- a/mitmproxy/tools/web/app.py +++ b/mitmproxy/tools/web/app.py @@ -50,7 +50,7 @@ def cert_to_json(certs: Sequence[certs.Cert]) -> dict | None: "serial": str(cert.serial), "subject": cert.subject, "issuer": cert.issuer, - "altnames": cert.altnames, + "altnames": [str(x.value) for x in cert.altnames], } diff --git a/test/mitmproxy/addons/test_tlsconfig.py b/test/mitmproxy/addons/test_tlsconfig.py index 64144e12e..d209fd91e 100644 --- a/test/mitmproxy/addons/test_tlsconfig.py +++ b/test/mitmproxy/addons/test_tlsconfig.py @@ -1,3 +1,4 @@ +import ipaddress import ssl import time from pathlib import Path @@ -128,20 +129,24 @@ class TestTlsConfig: ctx.server.certificate_list = [certs.Cert.from_pem(f.read())] entry = ta.get_cert(ctx) assert entry.cert.cn == "example.mitmproxy.org" - assert entry.cert.altnames == [ - "example.mitmproxy.org", - "server-address.example", - "127.0.0.1", - ] + assert entry.cert.altnames == x509.GeneralNames( + [ + x509.DNSName("example.mitmproxy.org"), + x509.IPAddress(ipaddress.ip_address("127.0.0.1")), + x509.DNSName("server-address.example"), + ] + ) # And now we also incorporate SNI. ctx.client.sni = "sni.example" entry = ta.get_cert(ctx) - assert entry.cert.altnames == [ - "example.mitmproxy.org", - "sni.example", - "server-address.example", - ] + assert entry.cert.altnames == x509.GeneralNames( + [ + x509.DNSName("example.mitmproxy.org"), + x509.DNSName("sni.example"), + x509.DNSName("server-address.example"), + ] + ) with open(tdata.path("mitmproxy/data/invalid-subject.pem"), "rb") as f: ctx.server.certificate_list = [certs.Cert.from_pem(f.read())] diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py index d3c33727c..d3850fdaf 100644 --- a/test/mitmproxy/test_certs.py +++ b/test/mitmproxy/test_certs.py @@ -1,3 +1,4 @@ +import ipaddress import os from datetime import datetime from datetime import timezone @@ -73,16 +74,16 @@ class TestCertStore: assert len(ca.default_chain_certs) == 2 def test_sans(self, tstore): - c1 = tstore.get_cert("foo.com", ["*.bar.com"]) + c1 = tstore.get_cert("foo.com", [x509.DNSName("*.bar.com")]) tstore.get_cert("foo.bar.com", []) # assert c1 == c2 c3 = tstore.get_cert("bar.com", []) assert not c1 == c3 def test_sans_change(self, tstore): - tstore.get_cert("foo.com", ["*.bar.com"]) - entry = tstore.get_cert("foo.bar.com", ["*.baz.com"]) - assert "*.baz.com" in entry.cert.altnames + tstore.get_cert("foo.com", [x509.DNSName("*.bar.com")]) + entry = tstore.get_cert("foo.bar.com", [x509.DNSName("*.baz.com")]) + assert x509.DNSName("*.baz.com") in entry.cert.altnames def test_expire(self, tstore): tstore.STORE_CAP = 3 @@ -90,29 +91,29 @@ class TestCertStore: tstore.get_cert("two.com", []) tstore.get_cert("three.com", []) - assert ("one.com", ()) in tstore.certs - assert ("two.com", ()) in tstore.certs - assert ("three.com", ()) in tstore.certs + assert ("one.com", x509.GeneralNames([])) in tstore.certs + assert ("two.com", x509.GeneralNames([])) in tstore.certs + assert ("three.com", x509.GeneralNames([])) in tstore.certs tstore.get_cert("one.com", []) - assert ("one.com", ()) in tstore.certs - assert ("two.com", ()) in tstore.certs - assert ("three.com", ()) in tstore.certs + assert ("one.com", x509.GeneralNames([])) in tstore.certs + assert ("two.com", x509.GeneralNames([])) in tstore.certs + assert ("three.com", x509.GeneralNames([])) in tstore.certs tstore.get_cert("four.com", []) - assert ("one.com", ()) not in tstore.certs - assert ("two.com", ()) in tstore.certs - assert ("three.com", ()) in tstore.certs - assert ("four.com", ()) in tstore.certs + assert ("one.com", x509.GeneralNames([])) not in tstore.certs + assert ("two.com", x509.GeneralNames([])) in tstore.certs + assert ("three.com", x509.GeneralNames([])) in tstore.certs + assert ("four.com", x509.GeneralNames([])) in tstore.certs def test_overrides(self, tmp_path): ca1 = certs.CertStore.from_store(tmp_path / "ca1", "test", 2048) ca2 = certs.CertStore.from_store(tmp_path / "ca2", "test", 2048) assert not ca1.default_ca.serial == ca2.default_ca.serial - dc = ca2.get_cert("foo.com", ["sans.example.com"]) + dc = ca2.get_cert("foo.com", [x509.DNSName("sans.example.com")]) dcp = tmp_path / "dc" dcp.write_bytes(dc.cert.to_pem()) ca1.add_cert_file("foo.com", dcp) @@ -133,6 +134,23 @@ class TestCertStore: # TODO: How do we actually attempt to read that file as another user? assert os.stat(filename).st_mode & 0o77 == 0 + @pytest.mark.parametrize( + "input,output", + [ + ( + "subdomain.example.com", + ["subdomain.example.com", "*.example.com", "*.com"], + ), + ( + x509.DNSName("subdomain.example.com"), + ["subdomain.example.com", "*.example.com", "*.com"], + ), + (x509.IPAddress(ipaddress.ip_address("127.0.0.1")), ["127.0.0.1"]), + ], + ) + def test_asterisk_forms(self, input, output): + assert certs.CertStore.asterisk_forms(input) == output + class TestDummyCert: def test_with_ca(self, tstore): @@ -140,17 +158,25 @@ class TestDummyCert: tstore.default_privatekey, tstore.default_ca._cert, "foo.com", - ["one.com", "two.com", "*.three.com", "127.0.0.1", "bücher.example"], + [ + x509.DNSName("one.com"), + x509.DNSName("two.com"), + x509.DNSName("*.three.com"), + x509.IPAddress(ipaddress.ip_address("127.0.0.1")), + x509.DNSName("bücher.example".encode("idna").decode("ascii")), + ], "Foo Ltd.", ) assert r.cn == "foo.com" - assert r.altnames == [ - "one.com", - "two.com", - "*.three.com", - "xn--bcher-kva.example", - "127.0.0.1", - ] + assert r.altnames == x509.GeneralNames( + [ + x509.DNSName("one.com"), + x509.DNSName("two.com"), + x509.DNSName("*.three.com"), + x509.IPAddress(ipaddress.ip_address("127.0.0.1")), + x509.DNSName("xn--bcher-kva.example"), + ] + ) assert r.organization == "Foo Ltd." r = certs.dummy_cert( @@ -158,7 +184,7 @@ class TestDummyCert: ) assert r.cn is None assert r.organization is None - assert r.altnames == [] + assert r.altnames == x509.GeneralNames([]) class TestCert: