mirror of
https://github.com/mitmproxy/mitmproxy.git
synced 2024-11-26 23:00:40 +00:00
Migrate to ruff (#6433)
This commit is contained in:
parent
7f18545f3c
commit
f55ee1d44f
29
.github/workflows/autofix.yml
vendored
29
.github/workflows/autofix.yml
vendored
@ -15,32 +15,9 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: install-pinned/pyupgrade@af7d65f31bddb01097a24da6c8fb694441f51cba
|
||||
- name: Run pyupgrade
|
||||
run: |
|
||||
shopt -s globstar
|
||||
export GLOBIGNORE='mitmproxy/contrib/**'
|
||||
pyupgrade --exit-zero-even-if-changed --keep-runtime-typing --py310-plus **/*.py
|
||||
|
||||
- uses: install-pinned/reorder_python_imports@9766e7ba4f33497b107014571afe3b5f4c2d946b
|
||||
- name: Run reorder-python-imports
|
||||
run: |
|
||||
shopt -s globstar
|
||||
export GLOBIGNORE='mitmproxy/contrib/**'
|
||||
reorder-python-imports --exit-zero-even-if-changed --py310-plus **/*.py
|
||||
|
||||
- uses: install-pinned/yesqa@e45b0928dd14d5c2a22695e32de8936530ba7a49
|
||||
- name: Run yesqa
|
||||
run: |
|
||||
shopt -s globstar
|
||||
export GLOBIGNORE='mitmproxy/contrib/**'
|
||||
yesqa **/*.py || true
|
||||
|
||||
- uses: install-pinned/autoflake@46b4898323be58db319656fe2758f3fd5ddfee32
|
||||
- run: autoflake --in-place --remove-all-unused-imports --exclude mitmproxy/contrib -r .
|
||||
|
||||
- uses: install-pinned/black@cd5d66326da7f8f47500e431cc8cb6d239eea3b5
|
||||
- run: black .
|
||||
- uses: install-pinned/ruff@f3fd6b89f77ff37a32e69cd2da72e0fc6e21f837
|
||||
- run: ruff --fix-only .
|
||||
- run: ruff format .
|
||||
|
||||
- name: Run prettier
|
||||
run: |
|
||||
|
2
.github/workflows/main.yml
vendored
2
.github/workflows/main.yml
vendored
@ -19,7 +19,7 @@ jobs:
|
||||
lint:
|
||||
uses: mhils/workflows/.github/workflows/python-tox.yml@main
|
||||
with:
|
||||
cmd: tox -e flake8
|
||||
cmd: tox -e lint
|
||||
|
||||
filename-matching:
|
||||
uses: mhils/workflows/.github/workflows/python-tox.yml@main
|
||||
|
@ -91,7 +91,7 @@ Keeping to a consistent code style throughout the project makes it easier to con
|
||||
We enforce the following check for all PRs:
|
||||
|
||||
```shell
|
||||
tox -e flake8
|
||||
tox -e lint
|
||||
```
|
||||
|
||||
If a linting error is detected, the automated pull request checks will fail and block merging.
|
||||
|
@ -3,7 +3,6 @@ import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
here = Path(__file__).parent
|
||||
|
||||
for script in sorted((here / "scripts").glob("*.py")):
|
||||
|
@ -2,7 +2,6 @@
|
||||
import screenplays
|
||||
from clidirector import CliDirector
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
director = CliDirector()
|
||||
screenplays.record_user_interface(director)
|
||||
|
@ -1,7 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from mitmproxy import flowfilter
|
||||
|
||||
|
||||
print('<table class="table filtertable"><tbody>')
|
||||
for i in flowfilter.help:
|
||||
print("<tr><th>%s</th><td>%s</td></tr>" % i)
|
||||
|
@ -4,6 +4,7 @@ protobuf messages based on a user defined rule set.
|
||||
|
||||
"""
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.addonmanager import Loader
|
||||
from mitmproxy.contentviews.grpc import ProtoParser
|
||||
from mitmproxy.contentviews.grpc import ViewConfig
|
||||
from mitmproxy.contentviews.grpc import ViewGrpcProtobuf
|
||||
@ -102,7 +103,7 @@ class ViewGrpcWithRules(ViewGrpcProtobuf):
|
||||
view = ViewGrpcWithRules()
|
||||
|
||||
|
||||
def load(l):
|
||||
def load(loader: Loader):
|
||||
contentviews.add(view)
|
||||
|
||||
|
||||
|
@ -8,6 +8,7 @@ The content view API is explained in the mitmproxy.contentviews module.
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import http
|
||||
from mitmproxy.addonmanager import Loader
|
||||
|
||||
|
||||
class ViewSwapCase(contentviews.View):
|
||||
@ -42,7 +43,7 @@ class ViewSwapCase(contentviews.View):
|
||||
view = ViewSwapCase()
|
||||
|
||||
|
||||
def load(l):
|
||||
def load(loader: Loader):
|
||||
contentviews.add(view)
|
||||
|
||||
|
||||
|
@ -7,6 +7,7 @@ import logging
|
||||
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import http
|
||||
from mitmproxy.addonmanager import Loader
|
||||
|
||||
|
||||
class Filter:
|
||||
@ -16,8 +17,8 @@ class Filter:
|
||||
if "flowfilter" in updated:
|
||||
self.filter = flowfilter.parse(".")
|
||||
|
||||
def load(self, l):
|
||||
l.add_option("flowfilter", str, "", "Check that flow matches filter.")
|
||||
def load(self, loader: Loader):
|
||||
loader.add_option("flowfilter", str, "", "Check that flow matches filter.")
|
||||
|
||||
def response(self, flow: http.HTTPFlow) -> None:
|
||||
if flowfilter.match(self.filter, flow):
|
||||
|
@ -1,14 +1,17 @@
|
||||
"""Post messages to mitmproxy's event log."""
|
||||
import logging
|
||||
|
||||
from mitmproxy.addonmanager import Loader
|
||||
from mitmproxy.log import ALERT
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def load(l):
|
||||
logging.info("This is some informative text.")
|
||||
logging.warning("This is a warning.")
|
||||
logging.error("This is an error.")
|
||||
logging.log(
|
||||
|
||||
def load(loader: Loader):
|
||||
logger.info("This is some informative text.")
|
||||
logger.warning("This is a warning.")
|
||||
logger.error("This is an error.")
|
||||
logger.log(
|
||||
ALERT,
|
||||
"This is an alert. It has the same urgency as info, but will also pop up in the status bar.",
|
||||
)
|
||||
|
@ -8,7 +8,6 @@ import asyncio
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import http
|
||||
|
||||
|
||||
# Simple example: Inject a message as a response to an event
|
||||
|
||||
|
||||
|
@ -2,7 +2,6 @@ from mitmproxy import http
|
||||
from mitmproxy.connection import Server
|
||||
from mitmproxy.net.server_spec import ServerSpec
|
||||
|
||||
|
||||
# This scripts demonstrates how mitmproxy can switch to a second/different upstream proxy
|
||||
# in upstream proxy mode.
|
||||
#
|
||||
|
@ -7,7 +7,6 @@ import mitmproxy
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.certs import Cert
|
||||
|
||||
|
||||
# Certificate for client connection is generated in dummy_cert() in certs.py. Monkeypatching
|
||||
# the function to generate test cases for SSL Pinning.
|
||||
|
||||
|
@ -5,7 +5,6 @@ from mitmproxy import ctx
|
||||
from mitmproxy.addonmanager import Loader
|
||||
from mitmproxy.http import HTTPFlow
|
||||
|
||||
|
||||
"""
|
||||
This extension implements support for domain fronting.
|
||||
|
||||
|
@ -18,7 +18,6 @@ import json
|
||||
|
||||
from mitmproxy import http
|
||||
|
||||
|
||||
PATH_TO_COOKIES = "./cookies.json" # insert your path to the cookie file here
|
||||
FILTER_COOKIES = {
|
||||
"mycookie",
|
||||
|
@ -72,9 +72,9 @@ class NTLMUpstreamAuth:
|
||||
def running(self):
|
||||
def extract_flow_from_context(context: Context) -> http.HTTPFlow:
|
||||
if context and context.layers:
|
||||
for l in context.layers:
|
||||
if isinstance(l, HttpLayer):
|
||||
for _, stream in l.streams.items():
|
||||
for x in context.layers:
|
||||
if isinstance(x, HttpLayer):
|
||||
for _, stream in x.streams.items():
|
||||
return (
|
||||
stream.flow if isinstance(stream, HttpStream) else None
|
||||
)
|
||||
|
@ -16,7 +16,7 @@ Usage:
|
||||
"""
|
||||
|
||||
|
||||
def load(l):
|
||||
def load(_):
|
||||
import pydevd_pycharm
|
||||
|
||||
pydevd_pycharm.settrace(
|
||||
|
@ -24,6 +24,7 @@ from enum import Enum
|
||||
from mitmproxy import connection
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import tls
|
||||
from mitmproxy.addonmanager import Loader
|
||||
from mitmproxy.utils import human
|
||||
|
||||
|
||||
@ -78,8 +79,8 @@ class ProbabilisticStrategy(TlsStrategy):
|
||||
class MaybeTls:
|
||||
strategy: TlsStrategy
|
||||
|
||||
def load(self, l):
|
||||
l.add_option(
|
||||
def load(self, loader: Loader):
|
||||
loader.add_option(
|
||||
"tls_strategy",
|
||||
int,
|
||||
0,
|
||||
|
@ -185,8 +185,8 @@ class AddonManager:
|
||||
raise exceptions.AddonManagerError(
|
||||
"An addon called '%s' already exists." % name
|
||||
)
|
||||
l = Loader(self.master)
|
||||
self.invoke_addon_sync(addon, LoadHook(l))
|
||||
loader = Loader(self.master)
|
||||
self.invoke_addon_sync(addon, LoadHook(loader))
|
||||
for a in traverse([addon]):
|
||||
name = _get_name(a)
|
||||
self.lookup[name] = a
|
||||
|
@ -13,7 +13,6 @@ from mitmproxy.log import ALERT
|
||||
from mitmproxy.net.http import status_codes
|
||||
from mitmproxy.utils import emoji
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CONF_DIR = "~/.mitmproxy"
|
||||
|
@ -30,9 +30,9 @@ from mitmproxy.websocket import WebSocketMessage
|
||||
|
||||
|
||||
def indent(n: int, text: str) -> str:
|
||||
l = str(text).strip().splitlines()
|
||||
lines = str(text).strip().splitlines()
|
||||
pad = " " * n
|
||||
return "\n".join(pad + i for i in l)
|
||||
return "\n".join(pad + i for i in lines)
|
||||
|
||||
|
||||
CONTENTVIEW_STYLES: dict[str, dict[str, str | bool]] = {
|
||||
|
@ -16,6 +16,7 @@ from mitmproxy import flowfilter
|
||||
from mitmproxy import http
|
||||
from mitmproxy import types
|
||||
from mitmproxy import version
|
||||
from mitmproxy.addonmanager import Loader
|
||||
from mitmproxy.connection import Server
|
||||
from mitmproxy.coretypes.multidict import _MultiDict
|
||||
from mitmproxy.log import ALERT
|
||||
@ -73,8 +74,8 @@ class SaveHar:
|
||||
}
|
||||
}
|
||||
|
||||
def load(self, l):
|
||||
l.add_option(
|
||||
def load(self, loader: Loader):
|
||||
loader.add_option(
|
||||
"hardump",
|
||||
str,
|
||||
"",
|
||||
|
@ -314,7 +314,9 @@ class TlsConfig:
|
||||
except ValueError:
|
||||
host_name = server.sni.encode("idna")
|
||||
tls_start.ssl_conn.set_tlsext_host_name(host_name)
|
||||
ok = SSL._lib.X509_VERIFY_PARAM_set1_host(param, host_name, len(host_name)) # type: ignore
|
||||
ok = SSL._lib.X509_VERIFY_PARAM_set1_host( # type: ignore
|
||||
param, host_name, len(host_name)
|
||||
) # type: ignore
|
||||
SSL._openssl_assert(ok == 1) # type: ignore
|
||||
else:
|
||||
# RFC 6066: Literal IPv4 and IPv6 addresses are not permitted in "HostName",
|
||||
|
@ -35,7 +35,6 @@ from mitmproxy.log import ALERT
|
||||
from mitmproxy.utils import human
|
||||
from mitmproxy.utils import signals
|
||||
|
||||
|
||||
# The underlying sorted list implementation expects the sort key to be stable
|
||||
# for the lifetime of the object. However, if we sort by size, for instance,
|
||||
# the sort order changes as the flow progresses through its lifecycle. We
|
||||
|
@ -360,7 +360,9 @@ class CertStore:
|
||||
|
||||
# we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's
|
||||
# expected format.
|
||||
bio = OpenSSL.SSL._lib.BIO_new_file(str(path).encode(sys.getfilesystemencoding()), b"r") # type: ignore
|
||||
bio = OpenSSL.SSL._lib.BIO_new_file( # type: ignore
|
||||
str(path).encode(sys.getfilesystemencoding()), b"r"
|
||||
)
|
||||
if bio != OpenSSL.SSL._ffi.NULL: # type: ignore
|
||||
bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free) # type: ignore
|
||||
dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams( # type: ignore
|
||||
|
@ -13,6 +13,9 @@ metadata depend on the protocol in use. Known attributes can be found in
|
||||
"""
|
||||
import traceback
|
||||
|
||||
from ..tcp import TCPMessage
|
||||
from ..udp import UDPMessage
|
||||
from ..websocket import WebSocketMessage
|
||||
from . import auto
|
||||
from . import css
|
||||
from . import dns
|
||||
@ -32,9 +35,6 @@ from . import raw
|
||||
from . import urlencoded
|
||||
from . import wbxml
|
||||
from . import xml_html
|
||||
from ..tcp import TCPMessage
|
||||
from ..udp import UDPMessage
|
||||
from ..websocket import WebSocketMessage
|
||||
from .base import format_dict
|
||||
from .base import format_text
|
||||
from .base import KEY_MAX
|
||||
|
@ -13,9 +13,7 @@ def format_graphql(data):
|
||||
return """{header}
|
||||
---
|
||||
{query}
|
||||
""".format(
|
||||
header=json.dumps(header_data, indent=2), query=query
|
||||
)
|
||||
""".format(header=json.dumps(header_data, indent=2), query=query)
|
||||
|
||||
|
||||
def format_query_list(data: list[Any]):
|
||||
|
@ -9,8 +9,8 @@ from aioquic.buffer import BufferReadError
|
||||
from aioquic.h3.connection import parse_settings
|
||||
from aioquic.h3.connection import Setting
|
||||
|
||||
from . import base
|
||||
from ..proxy.layers.http import is_h3_alpn
|
||||
from . import base
|
||||
from .hex import ViewHexDump
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import tcp
|
||||
|
@ -105,9 +105,7 @@ def parse_ico(data: bytes) -> Metadata:
|
||||
parts.append(
|
||||
(
|
||||
f"Image {i + 1}",
|
||||
"Size: {} x {}\n"
|
||||
"{: >18}Bits per pixel: {}\n"
|
||||
"{: >18}PNG: {}".format(
|
||||
"Size: {} x {}\n" "{: >18}Bits per pixel: {}\n" "{: >18}PNG: {}".format(
|
||||
256 if not image.width else image.width,
|
||||
256 if not image.height else image.height,
|
||||
"",
|
||||
|
@ -22,26 +22,26 @@ def format_msgpack(
|
||||
|
||||
indent = ("text", " " * indent_count)
|
||||
|
||||
if type(data) is str:
|
||||
if isinstance(data, str):
|
||||
token = [("Token_Literal_String", f'"{data}"')]
|
||||
output[-1] += token
|
||||
|
||||
# Need to return if single value, but return is discarded in dict/list loop
|
||||
return output
|
||||
|
||||
elif type(data) is float or type(data) is int:
|
||||
token = [("Token_Literal_Number", repr(data))]
|
||||
output[-1] += token
|
||||
|
||||
return output
|
||||
|
||||
elif type(data) is bool:
|
||||
elif isinstance(data, bool):
|
||||
token = [("Token_Keyword_Constant", repr(data))]
|
||||
output[-1] += token
|
||||
|
||||
return output
|
||||
|
||||
elif type(data) is dict:
|
||||
elif isinstance(data, float | int):
|
||||
token = [("Token_Literal_Number", repr(data))]
|
||||
output[-1] += token
|
||||
|
||||
return output
|
||||
|
||||
elif isinstance(data, dict):
|
||||
output[-1] += [("text", "{")]
|
||||
for key in data:
|
||||
output.append(
|
||||
@ -61,7 +61,7 @@ def format_msgpack(
|
||||
|
||||
return output
|
||||
|
||||
elif type(data) is list:
|
||||
elif isinstance(data, list):
|
||||
output[-1] += [("text", "[")]
|
||||
|
||||
for count, item in enumerate(data):
|
||||
|
@ -1,5 +1,5 @@
|
||||
from . import base
|
||||
from .. import http
|
||||
from . import base
|
||||
|
||||
|
||||
class ViewQuery(base.View):
|
||||
|
@ -8,7 +8,8 @@ from functools import cache
|
||||
from typing import TypeVar
|
||||
|
||||
try:
|
||||
from types import UnionType, NoneType
|
||||
from types import NoneType
|
||||
from types import UnionType
|
||||
except ImportError: # pragma: no cover
|
||||
|
||||
class UnionType: # type: ignore
|
||||
|
@ -1013,9 +1013,9 @@ class Request(Message):
|
||||
on generating the boundary.
|
||||
"""
|
||||
boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
|
||||
self.headers[
|
||||
"content-type"
|
||||
] = ct = f"multipart/form-data; boundary={boundary}"
|
||||
self.headers["content-type"] = (
|
||||
ct
|
||||
) = f"multipart/form-data; boundary={boundary}"
|
||||
self.content = multipart.encode_multipart(ct, value)
|
||||
|
||||
@property
|
||||
|
@ -3,5 +3,4 @@ from .io import FlowReader
|
||||
from .io import FlowWriter
|
||||
from .io import read_flows_from_paths
|
||||
|
||||
|
||||
__all__ = ["FlowWriter", "FlowReader", "FilteredFlowWriter", "read_flows_from_paths"]
|
||||
|
@ -8,7 +8,6 @@ from mitmproxy import connection
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import http
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -210,11 +210,11 @@ def parse(data_type: int, data: bytes) -> TSerializable:
|
||||
raise ValueError(f"not a tnetstring: invalid null literal: {data!r}")
|
||||
return None
|
||||
if data_type == ord(b"]"):
|
||||
l = []
|
||||
lst = []
|
||||
while data:
|
||||
item, data = pop(data)
|
||||
l.append(item) # type: ignore
|
||||
return l
|
||||
lst.append(item) # type: ignore
|
||||
return lst
|
||||
if data_type == ord(b"}"):
|
||||
d = {}
|
||||
while data:
|
||||
|
@ -1,7 +1,6 @@
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
|
||||
_LABEL_SIZE = struct.Struct("!B")
|
||||
_POINTER_OFFSET = struct.Struct("!H")
|
||||
_POINTER_INDICATOR = 0b11000000
|
||||
|
@ -9,7 +9,6 @@ from .read import read_request_head
|
||||
from .read import read_response_head
|
||||
from .read import validate_headers
|
||||
|
||||
|
||||
__all__ = [
|
||||
"read_request_head",
|
||||
"read_response_head",
|
||||
|
@ -37,8 +37,8 @@ def parse(url):
|
||||
|
||||
# Size of Ascii character after encoding is 1 byte which is same as its size
|
||||
# But non-Ascii character's size after encoding will be more than its size
|
||||
def ascii_check(l):
|
||||
if len(l) == len(str(l).encode()):
|
||||
def ascii_check(x):
|
||||
if len(x) == len(str(x).encode()):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
@ -415,9 +415,9 @@ class OptManager:
|
||||
|
||||
o = self._options[optname]
|
||||
|
||||
def mkf(l, s):
|
||||
l = l.replace("_", "-")
|
||||
f = ["--%s" % l]
|
||||
def mkf(x, s):
|
||||
x = x.replace("_", "-")
|
||||
f = ["--%s" % x]
|
||||
if s:
|
||||
f.append("-" + s)
|
||||
return f
|
||||
|
@ -15,16 +15,6 @@ import h2.settings
|
||||
import h2.stream
|
||||
import h2.utilities
|
||||
|
||||
from . import RequestData
|
||||
from . import RequestEndOfMessage
|
||||
from . import RequestHeaders
|
||||
from . import RequestProtocolError
|
||||
from . import RequestTrailers
|
||||
from . import ResponseData
|
||||
from . import ResponseEndOfMessage
|
||||
from . import ResponseHeaders
|
||||
from . import ResponseProtocolError
|
||||
from . import ResponseTrailers
|
||||
from ...commands import CloseConnection
|
||||
from ...commands import Log
|
||||
from ...commands import RequestWakeup
|
||||
@ -37,6 +27,16 @@ from ...events import Start
|
||||
from ...events import Wakeup
|
||||
from ...layer import CommandGenerator
|
||||
from ...utils import expect
|
||||
from . import RequestData
|
||||
from . import RequestEndOfMessage
|
||||
from . import RequestHeaders
|
||||
from . import RequestProtocolError
|
||||
from . import RequestTrailers
|
||||
from . import ResponseData
|
||||
from . import ResponseEndOfMessage
|
||||
from . import ResponseHeaders
|
||||
from . import ResponseProtocolError
|
||||
from . import ResponseTrailers
|
||||
from ._base import format_error
|
||||
from ._base import HttpConnection
|
||||
from ._base import HttpEvent
|
||||
|
@ -9,7 +9,6 @@ import h2.exceptions
|
||||
import h2.settings
|
||||
import h2.stream
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -64,9 +64,7 @@ class QuicTlsSettings:
|
||||
"""The certificate to use for the connection."""
|
||||
certificate_chain: list[x509.Certificate] = field(default_factory=list)
|
||||
"""A list of additional certificates to send to the peer."""
|
||||
certificate_private_key: dsa.DSAPrivateKey | ec.EllipticCurvePrivateKey | rsa.RSAPrivateKey | None = (
|
||||
None
|
||||
)
|
||||
certificate_private_key: dsa.DSAPrivateKey | ec.EllipticCurvePrivateKey | rsa.RSAPrivateKey | None = None
|
||||
"""The certificate's private key."""
|
||||
cipher_suites: list[CipherSuite] | None = None
|
||||
"""An optional list of allowed/advertised cipher suites."""
|
||||
|
@ -312,7 +312,9 @@ class TLSLayer(tunnel.TunnelLayer):
|
||||
("SSL routines", "", "certificate verify failed"), # OpenSSL 3+
|
||||
]:
|
||||
verify_result = SSL._lib.SSL_get_verify_result(self.tls._ssl) # type: ignore
|
||||
error = SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(verify_result)).decode() # type: ignore
|
||||
error = SSL._ffi.string( # type: ignore
|
||||
SSL._lib.X509_verify_cert_error_string(verify_result) # type: ignore
|
||||
).decode()
|
||||
err = f"Certificate verify failed: {error}"
|
||||
elif last_err in [
|
||||
("SSL routines", "ssl3_read_bytes", "tlsv1 alert unknown ca"),
|
||||
|
@ -42,7 +42,6 @@ from mitmproxy.utils import asyncio_utils
|
||||
from mitmproxy.utils import human
|
||||
from mitmproxy.utils.data import pkg_data
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -519,9 +518,9 @@ if __name__ == "__main__": # pragma: no cover
|
||||
]
|
||||
|
||||
def next_layer(nl: layer.NextLayer):
|
||||
l = layer_stack.pop(0)(nl.context)
|
||||
l.debug = " " * len(nl.context.layers)
|
||||
nl.layer = l
|
||||
layr = layer_stack.pop(0)(nl.context)
|
||||
layr.debug = " " * len(nl.context.layers)
|
||||
nl.layer = layr
|
||||
|
||||
def request(flow: http.HTTPFlow):
|
||||
if "cached" in flow.request.path:
|
||||
|
@ -1,4 +1,3 @@
|
||||
from mitmproxy.tools.console import master
|
||||
|
||||
|
||||
__all__ = ["master"]
|
||||
|
@ -31,15 +31,15 @@ def is_keypress(k):
|
||||
return True
|
||||
|
||||
|
||||
def highlight_key(str, key, textattr="text", keyattr="key"):
|
||||
l = []
|
||||
parts = str.split(key, 1)
|
||||
def highlight_key(text, key, textattr="text", keyattr="key"):
|
||||
lst = []
|
||||
parts = text.split(key, 1)
|
||||
if parts[0]:
|
||||
l.append((textattr, parts[0]))
|
||||
l.append((keyattr, key))
|
||||
lst.append((textattr, parts[0]))
|
||||
lst.append((keyattr, key))
|
||||
if parts[1]:
|
||||
l.append((textattr, parts[1]))
|
||||
return l
|
||||
lst.append((textattr, parts[1]))
|
||||
return lst
|
||||
|
||||
|
||||
KEY_MAX = 30
|
||||
|
@ -30,7 +30,6 @@ from mitmproxy.tools.console import palettes
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import window
|
||||
|
||||
|
||||
T = TypeVar("T", str, bytes)
|
||||
|
||||
|
||||
|
@ -95,7 +95,7 @@ class Palette:
|
||||
low: Mapping[str, Sequence[str]]
|
||||
|
||||
def palette(self, transparent: bool):
|
||||
l: list[Sequence[str | None]] = []
|
||||
lst: list[Sequence[str | None]] = []
|
||||
highback, lowback = None, None
|
||||
if not transparent:
|
||||
if self.high and self.high.get("background"):
|
||||
@ -104,7 +104,7 @@ class Palette:
|
||||
|
||||
for i in self._fields:
|
||||
if transparent and i == "background":
|
||||
l.append(["background", "default", "default"])
|
||||
lst.append(["background", "default", "default"])
|
||||
else:
|
||||
v: list[str | None] = [i]
|
||||
low = list(self.low[i])
|
||||
@ -120,8 +120,8 @@ class Palette:
|
||||
elif highback and self.low[i][1] == "default":
|
||||
high = [None, low[0], highback]
|
||||
v.extend(high)
|
||||
l.append(tuple(v))
|
||||
return l
|
||||
lst.append(tuple(v))
|
||||
return lst
|
||||
|
||||
|
||||
def gen_gradient(palette, cols):
|
||||
|
@ -139,8 +139,9 @@ def check():
|
||||
else:
|
||||
new_options = [r]
|
||||
print(
|
||||
"{} is deprecated.\n"
|
||||
"Please use `{}` instead.".format(option, "` or `".join(new_options))
|
||||
"{} is deprecated.\n" "Please use `{}` instead.".format(
|
||||
option, "` or `".join(new_options)
|
||||
)
|
||||
)
|
||||
|
||||
for option in DEPRECATED.splitlines():
|
||||
|
@ -1852,11 +1852,13 @@ emoji = {
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import requests
|
||||
import io
|
||||
import re
|
||||
import string
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
from mitmproxy.tools.console.common import SYMBOL_MARK
|
||||
|
||||
CHAR_MARKERS = list(string.ascii_letters) + list(string.digits)
|
||||
|
@ -20,7 +20,6 @@ from typing import Generic
|
||||
from typing import ParamSpec
|
||||
from typing import TypeVar
|
||||
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
@ -4,7 +4,6 @@ import re
|
||||
from collections.abc import Iterable
|
||||
from typing import overload
|
||||
|
||||
|
||||
# https://mypy.readthedocs.io/en/stable/more_types.html#function-overloading
|
||||
|
||||
|
||||
@ -59,7 +58,8 @@ def always_str(str_or_bytes: None | str | bytes, *decode_args) -> None | str:
|
||||
# (http://unicode.org/charts/PDF/U2400.pdf), but that turned out to render badly
|
||||
# with monospace fonts. We are back to "." therefore.
|
||||
_control_char_trans = {
|
||||
x: ord(".") for x in range(32) # x + 0x2400 for unicode control group pictures
|
||||
x: ord(".")
|
||||
for x in range(32) # x + 0x2400 for unicode control group pictures
|
||||
}
|
||||
_control_char_trans[127] = ord(".") # 0x2421
|
||||
_control_char_trans_newline = _control_char_trans.copy()
|
||||
|
@ -6,8 +6,12 @@ import sys
|
||||
from typing import IO
|
||||
|
||||
if os.name == "nt":
|
||||
from ctypes import byref, windll # type: ignore
|
||||
from ctypes.wintypes import BOOL, DWORD, HANDLE, LPDWORD
|
||||
from ctypes import byref # type: ignore
|
||||
from ctypes import windll # type: ignore
|
||||
from ctypes.wintypes import BOOL
|
||||
from ctypes.wintypes import DWORD
|
||||
from ctypes.wintypes import HANDLE
|
||||
from ctypes.wintypes import LPDWORD
|
||||
|
||||
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
|
||||
STD_OUTPUT_HANDLE = -11
|
||||
|
@ -157,13 +157,25 @@ ignore_errors = true
|
||||
module = "test.*"
|
||||
ignore_errors = true
|
||||
|
||||
[tool.black]
|
||||
extend_exclude = "mitmproxy/contrib/"
|
||||
[tool.ruff]
|
||||
select = ["E", "F", "I"]
|
||||
extend-exclude = ["mitmproxy/contrib/"]
|
||||
ignore = ["F541", "E501"]
|
||||
|
||||
|
||||
[tool.ruff.isort]
|
||||
# these rules are a bit weird, but they mimic our existing reorder_python_imports style.
|
||||
# if we break compatibility here, consider removing all customization + enforce absolute imports.
|
||||
force-single-line = true
|
||||
order-by-type = false
|
||||
section-order = ["future", "standard-library", "third-party", "local-folder","first-party"]
|
||||
no-lines-before = ["first-party"]
|
||||
known-first-party = ["test", "mitmproxy"]
|
||||
|
||||
[tool.tox]
|
||||
legacy_tox_ini = """
|
||||
[tox]
|
||||
envlist = py, flake8, mypy
|
||||
envlist = py, lint, mypy
|
||||
skipsdist = True
|
||||
toxworkdir={env:TOX_WORK_DIR:.tox}
|
||||
|
||||
@ -179,12 +191,11 @@ commands =
|
||||
--full-cov=mitmproxy/ \
|
||||
{posargs}
|
||||
|
||||
[testenv:flake8]
|
||||
[testenv:lint]
|
||||
deps =
|
||||
flake8>=3.8.4,<6.1
|
||||
flake8-tidy-imports>=4.2.0,<5
|
||||
ruff>=0.1.3,<0.2
|
||||
commands =
|
||||
flake8 --jobs 8 mitmproxy examples test release {posargs}
|
||||
ruff .
|
||||
|
||||
[testenv:filename_matching]
|
||||
deps =
|
||||
|
@ -1,10 +1,3 @@
|
||||
[flake8]
|
||||
max-line-length = 140
|
||||
max-complexity = 25
|
||||
ignore = E203,E251,E252,C901,W292,W503,W504,W605,E741,E126,F541
|
||||
exclude = mitmproxy/contrib/*,test/mitmproxy/data/*,release/build/*
|
||||
addons = file,open,basestring,xrange,unicode,long,cmp
|
||||
|
||||
[tool:full_coverage]
|
||||
exclude =
|
||||
mitmproxy/tools/
|
||||
|
@ -48,9 +48,9 @@ def run_tests(src, test, fail):
|
||||
e = 0
|
||||
else:
|
||||
cov = [
|
||||
l
|
||||
for l in stdout.getvalue().split("\n")
|
||||
if (src in l) or ("was never imported" in l)
|
||||
line
|
||||
for line in stdout.getvalue().split("\n")
|
||||
if (src in line) or ("was never imported" in line)
|
||||
]
|
||||
if len(cov) == 1:
|
||||
print("FAIL:", cov[0])
|
||||
|
@ -41,7 +41,6 @@ from mitmproxy.test.tflow import tserver_conn
|
||||
from mitmproxy.test.tutils import tdnsreq
|
||||
from mitmproxy.utils import data
|
||||
|
||||
|
||||
tlsdata = data.Data(__name__)
|
||||
|
||||
|
||||
@ -100,7 +99,9 @@ async def test_start_stop(caplog_async):
|
||||
)
|
||||
assert repr(ps) == "Proxyserver(1 active conns)"
|
||||
|
||||
await ps.setup_servers() # assert this can always be called without side effects
|
||||
await (
|
||||
ps.setup_servers()
|
||||
) # assert this can always be called without side effects
|
||||
tctx.configure(ps, server=False)
|
||||
await caplog_async.await_log("stopped")
|
||||
if ps.servers.is_updating:
|
||||
|
@ -14,7 +14,6 @@ from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.tools import main
|
||||
|
||||
|
||||
# We want this to be speedy for testing
|
||||
script.ReloadInterval = 0.1
|
||||
|
||||
|
@ -303,16 +303,16 @@ def test_resolve():
|
||||
v.set_filter(f)
|
||||
v[0].marked = True
|
||||
|
||||
def m(l):
|
||||
return [i.request.method for i in l]
|
||||
def methods(flows):
|
||||
return [i.request.method for i in flows]
|
||||
|
||||
assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"]
|
||||
assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"]
|
||||
assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"]
|
||||
assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
|
||||
assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
|
||||
assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
|
||||
assert m(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"]
|
||||
assert methods(tctx.command(v.resolve, "~m get")) == ["GET", "GET"]
|
||||
assert methods(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"]
|
||||
assert methods(tctx.command(v.resolve, "@shown")) == ["GET", "GET"]
|
||||
assert methods(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
|
||||
assert methods(tctx.command(v.resolve, "@marked")) == ["GET"]
|
||||
assert methods(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
|
||||
assert methods(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"]
|
||||
|
||||
with pytest.raises(exceptions.CommandError, match="Invalid filter expression"):
|
||||
tctx.command(v.resolve, "~")
|
||||
|
@ -189,6 +189,7 @@ def test_parse_jpeg(filename, metadata, tdata):
|
||||
assert metadata == image_parser.parse_jpeg(f.read())
|
||||
|
||||
|
||||
# fmt: off
|
||||
@pytest.mark.parametrize(
|
||||
"filename, metadata",
|
||||
{
|
||||
@ -219,3 +220,4 @@ def test_parse_jpeg(filename, metadata, tdata):
|
||||
def test_ico(filename, metadata, tdata):
|
||||
with open(tdata.path(filename), "rb") as f:
|
||||
assert metadata == image_parser.parse_ico(f.read())
|
||||
# fmt: on
|
||||
|
@ -70,7 +70,7 @@ class TEnum(enum.Enum):
|
||||
|
||||
@dataclass
|
||||
class TLiteral(SerializableDataclass):
|
||||
l: Literal["foo", "bar"]
|
||||
lit: Literal["foo", "bar"]
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -130,7 +130,7 @@ class TestSerializableDataclass:
|
||||
},
|
||||
),
|
||||
(BuiltinChildren, {"a": None, "b": None, "c": None, "d": [], "e": None}),
|
||||
(TLiteral, {"l": "foo"}),
|
||||
(TLiteral, {"lit": "foo"}),
|
||||
],
|
||||
)
|
||||
def test_roundtrip(self, cls, state):
|
||||
@ -182,9 +182,9 @@ class TestSerializableDataclass:
|
||||
Unsupported.from_state({"a": "foo"})
|
||||
|
||||
def test_literal(self):
|
||||
assert TLiteral.from_state({"l": "foo"}).get_state() == {"l": "foo"}
|
||||
assert TLiteral.from_state({"lit": "foo"}).get_state() == {"lit": "foo"}
|
||||
with pytest.raises(ValueError):
|
||||
TLiteral.from_state({"l": "unknown"})
|
||||
TLiteral.from_state({"lit": "unknown"})
|
||||
|
||||
def test_peername(self):
|
||||
assert Addr.from_state({"peername": ("addr", 42)}).get_state() == {
|
||||
|
@ -20,7 +20,7 @@ def configure(updated):
|
||||
event_log.append("scriptconfigure")
|
||||
|
||||
|
||||
def load(l):
|
||||
def load(loader):
|
||||
event_log.append("scriptload")
|
||||
|
||||
|
||||
|
@ -8,7 +8,6 @@ from mitmproxy.io.har import fix_headers
|
||||
from mitmproxy.io.har import request_to_flow
|
||||
from mitmproxy.tools.web.app import flow_to_json
|
||||
|
||||
|
||||
data_dir = Path(__file__).parent.parent / "data"
|
||||
|
||||
|
||||
|
@ -38,10 +38,10 @@ def get_random_object(random=random, depth=0):
|
||||
what = random.randint(0, 1)
|
||||
if what == 0:
|
||||
n = random.randint(0, 10)
|
||||
l = []
|
||||
lst = []
|
||||
for _ in range(n):
|
||||
l.append(get_random_object(random, depth + 1))
|
||||
return l
|
||||
lst.append(get_random_object(random, depth + 1))
|
||||
return lst
|
||||
if what == 1:
|
||||
n = random.randint(0, 10)
|
||||
d = {}
|
||||
|
@ -9,7 +9,10 @@ from mitmproxy.net.dns import domain_names
|
||||
def test_unpack_from_with_compression():
|
||||
assert domain_names.unpack_from_with_compression(
|
||||
b"\xFF\x03www\x07example\x03org\x00", 1, domain_names.cache()
|
||||
) == ("www.example.org", 17)
|
||||
) == (
|
||||
"www.example.org",
|
||||
17,
|
||||
)
|
||||
with pytest.raises(
|
||||
struct.error, match=re.escape("unpack encountered domain name loop")
|
||||
):
|
||||
|
@ -5,7 +5,6 @@ import pytest
|
||||
|
||||
from mitmproxy.net.http import cookies
|
||||
|
||||
|
||||
cookie_pairs = [
|
||||
["=uno", [["", "uno"]]],
|
||||
["", []],
|
||||
|
@ -52,9 +52,7 @@ def test_parse():
|
||||
|
||||
|
||||
def test_ascii_check():
|
||||
test_url = (
|
||||
"https://xyz.tax-edu.net?flag=selectCourse&lc_id=42825&lc_name=茅莽莽猫氓猫氓".encode()
|
||||
)
|
||||
test_url = "https://xyz.tax-edu.net?flag=selectCourse&lc_id=42825&lc_name=茅莽莽猫氓猫氓".encode()
|
||||
scheme, host, port, full_path = url.parse(test_url)
|
||||
assert scheme == b"https"
|
||||
assert host == b"xyz.tax-edu.net"
|
||||
|
@ -30,7 +30,6 @@ from mitmproxy.proxy.layers import quic
|
||||
from mitmproxy.proxy.layers.http._http3 import Http3Client
|
||||
from test.mitmproxy.proxy import tutils
|
||||
|
||||
|
||||
example_request_headers = [
|
||||
(b":method", b"GET"),
|
||||
(b":scheme", b"http"),
|
||||
|
@ -113,8 +113,8 @@ def test_upstream_https(tctx):
|
||||
<< SendData(tctx2.client, serverhello)
|
||||
)
|
||||
assert (
|
||||
# forward serverhello to proxy1
|
||||
proxy1
|
||||
# forward serverhello to proxy1
|
||||
>> DataReceived(upstream, serverhello())
|
||||
<< SendData(upstream, request)
|
||||
)
|
||||
@ -277,8 +277,8 @@ def test_reverse_proxy_tcp_over_tls(
|
||||
)
|
||||
if connection_strategy == "lazy":
|
||||
(
|
||||
# only now we open a connection
|
||||
playbook
|
||||
# only now we open a connection
|
||||
<< OpenConnection(tctx.server)
|
||||
>> reply(None)
|
||||
)
|
||||
|
@ -31,7 +31,6 @@ from mitmproxy.udp import UDPMessage
|
||||
from mitmproxy.utils import data
|
||||
from test.mitmproxy.proxy import tutils
|
||||
|
||||
|
||||
tlsdata = data.Data(__name__)
|
||||
|
||||
|
||||
|
@ -7,6 +7,7 @@ from mitmproxy import exceptions
|
||||
from mitmproxy import hooks
|
||||
from mitmproxy import master
|
||||
from mitmproxy import options
|
||||
from mitmproxy.addonmanager import Loader
|
||||
from mitmproxy.proxy.layers.http import HttpRequestHook
|
||||
from mitmproxy.proxy.layers.http import HttpResponseHook
|
||||
from mitmproxy.test import taddons
|
||||
@ -54,8 +55,8 @@ class AsyncTHalt:
|
||||
|
||||
|
||||
class AOption:
|
||||
def load(self, l):
|
||||
l.add_option("custom_option", bool, False, "help")
|
||||
def load(self, loader: Loader):
|
||||
loader.add_option("custom_option", bool, False, "help")
|
||||
|
||||
|
||||
class AOldAPI:
|
||||
@ -155,22 +156,22 @@ async def test_mixed_async_sync(caplog):
|
||||
|
||||
async def test_loader(caplog):
|
||||
with taddons.context() as tctx:
|
||||
l = addonmanager.Loader(tctx.master)
|
||||
l.add_option("custom_option", bool, False, "help")
|
||||
assert "custom_option" in l.master.options
|
||||
loader = addonmanager.Loader(tctx.master)
|
||||
loader.add_option("custom_option", bool, False, "help")
|
||||
assert "custom_option" in loader.master.options
|
||||
|
||||
# calling this again with the same signature is a no-op.
|
||||
l.add_option("custom_option", bool, False, "help")
|
||||
loader.add_option("custom_option", bool, False, "help")
|
||||
assert not caplog.text
|
||||
|
||||
# a different signature should emit a warning though.
|
||||
l.add_option("custom_option", bool, True, "help")
|
||||
loader.add_option("custom_option", bool, True, "help")
|
||||
assert "Over-riding existing option" in caplog.text
|
||||
|
||||
def cmd(a: str) -> str:
|
||||
return "foo"
|
||||
|
||||
l.add_command("test.command", cmd)
|
||||
loader.add_command("test.command", cmd)
|
||||
|
||||
|
||||
async def test_simple(caplog):
|
||||
|
@ -10,7 +10,6 @@ from cryptography.x509 import NameOID
|
||||
from ..conftest import skip_windows
|
||||
from mitmproxy import certs
|
||||
|
||||
|
||||
# class TestDNTree:
|
||||
# def test_simple(self):
|
||||
# d = certs.DNTree()
|
||||
|
@ -25,7 +25,9 @@ class TestResourceRecord:
|
||||
assert (
|
||||
str(dns.ResourceRecord.PTR("test", "some.other.host")) == "some.other.host"
|
||||
)
|
||||
assert str(dns.ResourceRecord.TXT("test", "unicode text 😀")) == "unicode text 😀"
|
||||
assert (
|
||||
str(dns.ResourceRecord.TXT("test", "unicode text 😀")) == "unicode text 😀"
|
||||
)
|
||||
assert (
|
||||
str(
|
||||
dns.ResourceRecord(
|
||||
|
@ -44,10 +44,10 @@ class TestSerialize:
|
||||
|
||||
sio.seek(0)
|
||||
r = mitmproxy.io.FlowReader(sio)
|
||||
l = list(r.stream())
|
||||
assert len(l) == 1
|
||||
lst = list(r.stream())
|
||||
assert len(lst) == 1
|
||||
|
||||
f2 = l[0]
|
||||
f2 = lst[0]
|
||||
assert f2.get_state() == f.get_state()
|
||||
assert f2.request.data == f.request.data
|
||||
assert f2.marked
|
||||
|
@ -1,6 +1,5 @@
|
||||
from mitmproxy import tls
|
||||
|
||||
|
||||
CLIENT_HELLO_NO_EXTENSIONS = bytes.fromhex(
|
||||
"03015658a756ab2c2bff55f636814deac086b7ca56b65058c7893ffc6074f5245f70205658a75475103a152637"
|
||||
"78e1bb6d22e8bbd5b6b0a3a59760ad354e91ba20d353001a0035002f000a000500040009000300060008006000"
|
||||
|
@ -4,9 +4,9 @@ from collections.abc import Sequence
|
||||
|
||||
import pytest
|
||||
|
||||
from . import test_command
|
||||
import mitmproxy.exceptions
|
||||
import mitmproxy.types
|
||||
from . import test_command
|
||||
from mitmproxy import command
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.test import taddons
|
||||
|
@ -188,9 +188,8 @@ def test_parse():
|
||||
cmd: cmd
|
||||
"""
|
||||
)
|
||||
assert (
|
||||
kmc.parse(
|
||||
"""
|
||||
assert kmc.parse(
|
||||
"""
|
||||
- key: key1
|
||||
ctx: [one, two]
|
||||
help: one
|
||||
@ -198,13 +197,11 @@ def test_parse():
|
||||
foo bar
|
||||
foo bar
|
||||
"""
|
||||
)
|
||||
== [
|
||||
{
|
||||
"key": "key1",
|
||||
"ctx": ["one", "two"],
|
||||
"help": "one",
|
||||
"cmd": "foo bar foo bar\n",
|
||||
}
|
||||
]
|
||||
)
|
||||
) == [
|
||||
{
|
||||
"key": "key1",
|
||||
"ctx": ["one", "two"],
|
||||
"help": "one",
|
||||
"cmd": "foo bar foo bar\n",
|
||||
}
|
||||
]
|
||||
|
@ -2,7 +2,6 @@ import asyncio
|
||||
|
||||
from mitmproxy.tools import main
|
||||
|
||||
|
||||
shutdown_script = "mitmproxy/data/addonscripts/shutdown.py"
|
||||
|
||||
|
||||
|
@ -487,8 +487,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
|
||||
|
||||
def test_options(self):
|
||||
j = get_json(self.fetch("/options"))
|
||||
assert type(j) == dict
|
||||
assert type(j["anticache"]) == dict
|
||||
assert isinstance(j, dict)
|
||||
assert isinstance(j["anticache"], dict)
|
||||
|
||||
def test_option_update(self):
|
||||
assert self.put_json("/options", {"anticache": True}).code == 200
|
||||
|
Loading…
Reference in New Issue
Block a user