mirror of
https://github.com/langchain-ai/h2.git
synced 2026-07-01 22:24:10 -04:00
reject header names and values containing unpermitted characters \r, \n, or \0x00
This commit is contained in:
+1
-1
@@ -6,7 +6,7 @@ dev
|
||||
|
||||
**API Changes (Backward Incompatible)**
|
||||
|
||||
-
|
||||
- Reject header names and values containing unpermitted characters `\r`, `\n`, or `\0x00`.
|
||||
|
||||
**API Changes (Backward Compatible)**
|
||||
|
||||
|
||||
@@ -24,6 +24,12 @@ UPPER_RE = re.compile(b"[A-Z]")
|
||||
SIGIL = ord(b":")
|
||||
INFORMATIONAL_START = ord(b"1")
|
||||
|
||||
HEADER_UNPERMITTED_CHARACTERS = frozenset([
|
||||
b"\r",
|
||||
b"\n",
|
||||
b"\x00",
|
||||
])
|
||||
|
||||
|
||||
# A set of headers that are hop-by-hop or connection-specific and thus
|
||||
# forbidden in HTTP/2. This list comes from RFC 7540 § 8.1.2.2.
|
||||
@@ -201,6 +207,9 @@ def validate_headers(headers: Iterable[Header], hdr_validation_flags: HeaderVali
|
||||
# For example, we avoid tuple unpacking in loops because it represents a
|
||||
# fixed cost that we don't want to spend, instead indexing into the header
|
||||
# tuples.
|
||||
headers = _reject_unpermitted_characters(
|
||||
headers, hdr_validation_flags,
|
||||
)
|
||||
headers = _reject_empty_header_names(
|
||||
headers, hdr_validation_flags,
|
||||
)
|
||||
@@ -225,6 +234,22 @@ def validate_headers(headers: Iterable[Header], hdr_validation_flags: HeaderVali
|
||||
return _check_path_header(headers, hdr_validation_flags)
|
||||
|
||||
|
||||
def _reject_unpermitted_characters(headers: Iterable[Header],
|
||||
hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
|
||||
"""
|
||||
Raises a ProtocolError if any header names or values contain unpermitted characters.
|
||||
See RFC 7540, section 10.3 and 8.1.2.6.
|
||||
"""
|
||||
for header in headers:
|
||||
for c in HEADER_UNPERMITTED_CHARACTERS:
|
||||
if c in header[0]:
|
||||
msg = f"Unpermitted character '{c}' in header name: {header[0]!r}"
|
||||
raise ProtocolError(msg)
|
||||
if c in header[1]:
|
||||
msg = f"Unpermitted character '{c}' in header value: {header[1]!r}"
|
||||
raise ProtocolError(msg)
|
||||
yield header
|
||||
|
||||
|
||||
def _reject_empty_header_names(headers: Iterable[Header],
|
||||
hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]:
|
||||
|
||||
@@ -48,6 +48,12 @@ class TestInvalidFrameSequences:
|
||||
[*base_request_headers, ("name ", "name with trailing space")],
|
||||
[*base_request_headers, ("name", " value with leading space")],
|
||||
[*base_request_headers, ("name", "value with trailing space ")],
|
||||
[*base_request_headers, ("unpermitted-\r-characters", "value")],
|
||||
[*base_request_headers, ("unpermitted-\n-characters", "value")],
|
||||
[*base_request_headers, ("unpermitted-\x00-characters", "value")],
|
||||
[*base_request_headers, ("unpermitted-characters", "some \r value")],
|
||||
[*base_request_headers, ("unpermitted-characters", "some \n value")],
|
||||
[*base_request_headers, ("unpermitted-characters", "some \x00 value")],
|
||||
[header for header in base_request_headers
|
||||
if header[0] != ":authority"],
|
||||
[(":protocol", "websocket"), *base_request_headers],
|
||||
@@ -665,7 +671,7 @@ class TestFilter:
|
||||
|
||||
def test_inbound_header_name_length_full_frame_decode(self, frame_factory) -> None:
|
||||
f = frame_factory.build_headers_frame([])
|
||||
f.data = b"\x00\x00\x05\x00\x00\x00\x00\x04"
|
||||
f.data = b"\x00\x00\x01\x04"
|
||||
data = f.serialize()
|
||||
|
||||
c = h2.connection.H2Connection(config=h2.config.H2Configuration(client_side=False))
|
||||
|
||||
Reference in New Issue
Block a user