switch to stdlib logging

mitmproxy previously used a homegrown logging mechanism based around
`mitmproxy.ctx.log` and the `add_log` hook. This worked well for everything
we control, but does not work outside the mitmproxy universe.
For now we have simply ignored logging in e.g. tornado or h2, but with the
upcoming introduction of mitmproxy_wireguard we now have a dependency
on some Rust/PyO3 code for which we definitely want logs, but which also
cannot easily be changed to use our homegrown logging (PyO3 does the heavy
lifting to add interoperability with stdlib logging). Long story short,
we want to introduce a log handler for stdlib logging.

Now there are two ways how such a handler could operate:

 1. We could build a handler that forwards all stdlib log events
    into our homegrown mechanism.
 2. We embrace stdlib's logging as the correct way to do things,
    and get rid of our homegrown stuff.

This PR follows the second approach by removing the `add_log` hook and
rewriting the `TermLog` and `EventStore` addons to listen for stdlib log records.
This means that all `mitmproxy.ctx.log.info` events are now simply `logging.info` etc.

One upside of this approach is that many parts of the codebase now don't depend
on the existence of `mitmproxy.ctx` and we can use off-the-shelf things like pytest's
`caplog`. We can also now better colorize log output and/or add timestamps.
This commit is contained in:
Maximilian Hils 2022-09-17 15:41:54 +02:00
parent 0afc48b714
commit c69239bb90
120 changed files with 1100 additions and 822 deletions

View File

@ -31,6 +31,10 @@
([#5507](https://github.com/mitmproxy/mitmproxy/pull/5507), @LIU-shuyi)
* Add HTTP/3 binary frame content view.
([#5582](https://github.com/mitmproxy/mitmproxy/pull/5582), @mhils)
* Remove `add_log` event hook. Users should use the builtin `logging` module instead.
([#5590](https://github.com/mitmproxy/mitmproxy/pull/5590), @mhils)
* Deprecated `mitmproxy.ctx.log` in favor of Python's builtin `logging` module.
([#5590](https://github.com/mitmproxy/mitmproxy/pull/5590), @mhils)
## 28 June 2022: mitmproxy 8.1.1

View File

@ -36,7 +36,7 @@ def category(name: str, desc: str, hooks: list[type[hooks.Hook]]) -> None:
imports.discard("builtins")
if types:
print(f"from typing import {', '.join(sorted(types))}")
print("from mitmproxy import ctx")
print("import logging")
for imp in sorted(imports):
print(f"import {imp}")
print()
@ -58,10 +58,10 @@ def category(name: str, desc: str, hooks: list[type[hooks.Hook]]) -> None:
print(textwrap.indent(f'"""\n{doc}\n"""', " "))
if params:
print(
f' ctx.log(f"{hook.name}: {" ".join("{" + p.name + "=}" for p in params)}")'
f' logging.info(f"{hook.name}: {" ".join("{" + p.name + "=}" for p in params)}")'
)
else:
print(f' ctx.log("{hook.name}")')
print(f' logging.info("{hook.name}")')
print("")

View File

@ -10,6 +10,27 @@ menu:
We try to avoid them, but this page lists breaking changes in the mitmproxy addon API.
## mitmproxy 9.0
#### Logging
We've deprecated mitmproxy's homegrown logging system in favor of Python's builtin `logging` module.
This means that addons should now use standard logging functionality instead of `mitmproxy.ctx.log`:
```python
# Deprecated:
from mitmproxy import ctx
ctx.log.info("hello world")
# New:
import logging
logging.info("hello world")
```
Accordingly, the `add_log` event has been removed. Developers who rely on log entries need to register their own
`logging.Handler` instead. An example for this can be found in the `EventStore` addon.
## mitmproxy 7.0
#### Connection Events

View File

@ -3,7 +3,7 @@ Basic skeleton of a mitmproxy addon.
Run as follows: mitmproxy -s anatomy.py
"""
from mitmproxy import ctx
import logging
class Counter:
@ -12,7 +12,7 @@ class Counter:
def request(self, flow):
self.num = self.num + 1
ctx.log.info("We've seen %d flows" % self.num)
logging.info("We've seen %d flows" % self.num)
addons = [Counter()]

View File

@ -1,10 +1,11 @@
"""Handle flows as command arguments."""
import logging
from collections.abc import Sequence
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow
from mitmproxy import http
from mitmproxy.log import ALERT
class MyAddon:
@ -13,7 +14,7 @@ class MyAddon:
for f in flows:
if isinstance(f, http.HTTPFlow):
f.request.headers["myheader"] = "value"
ctx.log.alert("done")
logging.log(ALERT, "done")
addons = [MyAddon()]

View File

@ -1,11 +1,12 @@
"""Handle file paths as command arguments."""
import logging
from collections.abc import Sequence
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import types
from mitmproxy.log import ALERT
class MyAddon:
@ -24,7 +25,7 @@ class MyAddon:
for cnt, dom in sorted((v, k) for (k, v) in totals.items()):
fp.write(f"{cnt}: {dom}\n")
ctx.log.alert("done")
logging.log(ALERT, "done")
addons = [MyAddon()]

View File

@ -1,6 +1,7 @@
"""Add a custom command to mitmproxy's command prompt."""
import logging
from mitmproxy import command
from mitmproxy import ctx
class MyAddon:
@ -10,7 +11,7 @@ class MyAddon:
@command.command("myaddon.inc")
def inc(self) -> None:
self.num += 1
ctx.log.info(f"num = {self.num}")
logging.info(f"num = {self.num}")
addons = [MyAddon()]

View File

@ -1,8 +1,10 @@
"""
Use mitmproxy's filter pattern in scripts.
"""
from mitmproxy import flowfilter
import logging
from mitmproxy import ctx, http
from mitmproxy import flowfilter
class Filter:
@ -18,8 +20,8 @@ class Filter:
def response(self, flow: http.HTTPFlow) -> None:
if flowfilter.match(self.filter, flow):
ctx.log.info("Flow matches filter:")
ctx.log.info(flow)
logging.info("Flow matches filter:")
logging.info(flow)
addons = [Filter()]

View File

@ -1,8 +1,11 @@
"""Post messages to mitmproxy's event log."""
from mitmproxy import ctx
import logging
from mitmproxy.log import ALERT
def load(l):
ctx.log.info("This is some informative text.")
ctx.log.warn("This is a warning.")
ctx.log.error("This is an error.")
logging.info("This is some informative text.")
logging.warning("This is a warning.")
logging.error("This is an error.")
logging.log(ALERT, "This is an alert. It has the same urgency as info, but will also pop up in the status bar.")

View File

@ -2,18 +2,19 @@
Make events hooks non-blocking using async or @concurrent
"""
import asyncio
import logging
import time
from mitmproxy.script import concurrent
from mitmproxy import ctx
# Hooks can be async, which allows the hook to call async functions and perform async I/O
# without blocking other requests. This is generally preferred for new addons.
async def request(flow):
ctx.log.info(f"handle request: {flow.request.host}{flow.request.path}")
logging.info(f"handle request: {flow.request.host}{flow.request.path}")
await asyncio.sleep(5)
ctx.log.info(f"start request: {flow.request.host}{flow.request.path}")
logging.info(f"start request: {flow.request.host}{flow.request.path}")
# Another option is to use @concurrent, which launches the hook in its own thread.
@ -21,7 +22,6 @@ async def request(flow):
# Rename the function below to request(flow) to try it out.
@concurrent # Remove this to make it synchronous and see what happens
def request_concurrent(flow):
# This is ugly in mitmproxy's UI, but you don't want to use mitmproxy.ctx.log from a different thread.
print(f"handle request: {flow.request.host}{flow.request.path}")
logging.info(f"handle request: {flow.request.host}{flow.request.path}")
time.sleep(5)
print(f"start request: {flow.request.host}{flow.request.path}")
logging.info(f"start request: {flow.request.host}{flow.request.path}")

View File

@ -8,11 +8,13 @@ Usage:
and then send a HTTP request to trigger the shutdown:
curl --proxy localhost:8080 http://example.com/path
"""
import logging
from mitmproxy import ctx, http
def request(flow: http.HTTPFlow) -> None:
# a random condition to make this example a bit more interactive
if flow.request.pretty_url == "http://example.com/path":
ctx.log.info("Shutting down everything...")
logging.info("Shutting down everything...")
ctx.master.shutdown()

View File

@ -10,8 +10,9 @@ Example Invocation:
mitmdump --rawtcp --tcp-hosts ".*" -s examples/tcp-simple.py
"""
import logging
from mitmproxy.utils import strutils
from mitmproxy import ctx
from mitmproxy import tcp
@ -19,6 +20,6 @@ def tcp_message(flow: tcp.TCPFlow):
message = flow.messages[-1]
message.content = message.content.replace(b"foo", b"bar")
ctx.log.info(
logging.info(
f"tcp_message[from_client={message.from_client}), content={strutils.bytes_to_escaped_str(message.content)}]"
)

View File

@ -1,6 +1,8 @@
"""Process individual messages from a WebSocket connection."""
import logging
import re
from mitmproxy import ctx, http
from mitmproxy import http
def websocket_message(flow: http.HTTPFlow):
@ -10,9 +12,9 @@ def websocket_message(flow: http.HTTPFlow):
# was the message sent from the client or server?
if message.from_client:
ctx.log.info(f"Client sent a message: {message.content!r}")
logging.info(f"Client sent a message: {message.content!r}")
else:
ctx.log.info(f"Server sent a message: {message.content!r}")
logging.info(f"Server sent a message: {message.content!r}")
# manipulate the message content
message.content = re.sub(rb"^Hello", b"HAPPY", message.content)

View File

@ -4,8 +4,7 @@ This module is for blocking DNS over HTTPS requests.
It loads a blocklist of IPs and hostnames that are known to serve DNS over HTTPS requests.
It also uses headers, query params, and paths to detect DoH (and block it)
"""
from mitmproxy import ctx
import logging
# known DoH providers' hostnames and IP addresses to block
default_blocklist: dict = {
@ -147,7 +146,7 @@ def _request_has_doh_looking_path(flow):
:return: True if path looks like it's DoH, otherwise False
"""
doh_paths = [
'/dns-query', # used in example in RFC 8484 (see https://tools.ietf.org/html/rfc8484#section-4.1.1)
'/dns-query', # used in example in RFC 8484 (see https://tools.ietf.org/html/rfc8484#section-4.1.1)
]
path = flow.request.path.split('?')[0]
return path in doh_paths
@ -180,6 +179,6 @@ def request(flow):
for check in doh_request_detection_checks:
is_doh = check(flow)
if is_doh:
ctx.log.warn("[DoH Detection] DNS over HTTPS request detected via method \"%s\"" % check.__name__)
logging.warning("[DoH Detection] DNS over HTTPS request detected via method \"%s\"" % check.__name__)
flow.kill()
break

View File

@ -8,6 +8,8 @@ Example usage:
- mitmdump -s custom_next_layer.py
- curl -x localhost:8080 -k https://example.com
"""
import logging
from mitmproxy import ctx
from mitmproxy.proxy import layer, layers
@ -19,7 +21,7 @@ def running():
def next_layer(nextlayer: layer.NextLayer):
ctx.log(
logging.info(
f"{nextlayer.context=}\n"
f"{nextlayer.data_client()[:70]=}\n"
f"{nextlayer.data_server()[:70]=}\n"

View File

@ -8,22 +8,21 @@ filename endwith '.zhar' will be compressed:
mitmdump -s ./har_dump.py --set hardump=./dump.zhar
"""
import json
import base64
import zlib
import json
import logging
import os
from datetime import datetime
from datetime import timezone
import mitmproxy
import zlib
import mitmproxy
from mitmproxy import connection
from mitmproxy import version
from mitmproxy import ctx
from mitmproxy.utils import strutils
from mitmproxy import version
from mitmproxy.net.http import cookies
from mitmproxy.utils import strutils
HAR: dict = {}
@ -166,7 +165,7 @@ def done():
json_dump: str = json.dumps(HAR, indent=2)
if ctx.options.hardump == '-':
mitmproxy.ctx.log(json_dump)
print(json_dump)
else:
raw: bytes = json_dump.encode()
if ctx.options.hardump.endswith('.zhar'):
@ -175,7 +174,7 @@ def done():
with open(os.path.expanduser(ctx.options.hardump), "wb") as f:
f.write(raw)
mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
logging.info("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
def format_cookies(cookie_list):

View File

@ -9,12 +9,13 @@
# remember to add your own mitmproxy authorative certs in your browser/os!
# certs docs: https://docs.mitmproxy.org/stable/concepts-certificates/
# filter expressions docs: https://docs.mitmproxy.org/stable/concepts-filters/
import os
import logging
import mimetypes
import os
from pathlib import Path
from mitmproxy import flowfilter
from mitmproxy import ctx, http
from mitmproxy import flowfilter
class HTTPDump:
@ -22,16 +23,16 @@ class HTTPDump:
self.filter = ctx.options.dumper_filter
loader.add_option(
name = "dumper_folder",
typespec = str,
default = "httpdump",
help = "content dump destination folder",
name="dumper_folder",
typespec=str,
default="httpdump",
help="content dump destination folder",
)
loader.add_option(
name = "open_browser",
typespec = bool,
default = True,
help = "open integrated browser at start"
name="open_browser",
typespec=bool,
default=True,
help="open integrated browser at start"
)
def running(self):
@ -66,7 +67,7 @@ class HTTPDump:
if flow.response.content:
with open(filepath, "wb") as f:
f.write(flow.response.content)
ctx.log.info(f"Saved! {filepath}")
logging.info(f"Saved! {filepath}")
addons = [HTTPDump()]

View File

@ -30,10 +30,12 @@ Configuration:
dump_destination: "/user/rastley/output.log"
EOF
"""
from threading import Lock, Thread
from queue import Queue
import base64
import json
import logging
from queue import Queue
from threading import Lock, Thread
import requests
from mitmproxy import ctx
@ -48,6 +50,7 @@ class JSONDumper:
for out-of-the-box Elasticsearch support, and then either writes
the result to a file or sends it to a URL.
"""
def __init__(self):
self.outfile = None
self.transformations = None
@ -88,7 +91,7 @@ class JSONDumper:
('client_conn', 'address'),
),
'ws_messages': (
('messages', ),
('messages',),
),
'headers': (
('request', 'headers'),
@ -207,15 +210,15 @@ class JSONDumper:
if ctx.options.dump_destination.startswith('http'):
self.outfile = None
self.url = ctx.options.dump_destination
ctx.log.info('Sending all data frames to %s' % self.url)
logging.info('Sending all data frames to %s' % self.url)
if ctx.options.dump_username and ctx.options.dump_password:
self.auth = (ctx.options.dump_username, ctx.options.dump_password)
ctx.log.info('HTTP Basic auth enabled.')
logging.info('HTTP Basic auth enabled.')
else:
self.outfile = open(ctx.options.dump_destination, 'a')
self.url = None
self.lock = Lock()
ctx.log.info('Writing all data frames to %s' % ctx.options.dump_destination)
logging.info('Writing all data frames to %s' % ctx.options.dump_destination)
self._init_transformations()

View File

@ -1,8 +1,9 @@
import base64
import binascii
import logging
import socket
from typing import Any, Optional
import binascii
from ntlm_auth import gss_channel_bindings, ntlm
from mitmproxy import addonmanager, http
@ -25,7 +26,7 @@ class NTLMUpstreamAuth:
"""
def load(self, loader: addonmanager.Loader) -> None:
ctx.log.info("NTLMUpstreamAuth loader")
logging.info("NTLMUpstreamAuth loader")
loader.add_option(
name="upstream_ntlm_auth",
typespec=Optional[str],
@ -60,7 +61,7 @@ class NTLMUpstreamAuth:
Valid values are 0-5 (Default: 3)
"""
)
ctx.log.debug("AddOn: NTLM Upstream Authentication - Loaded")
logging.debug("AddOn: NTLM Upstream Authentication - Loaded")
def running(self):
def extract_flow_from_context(context: Context) -> http.HTTPFlow:
@ -73,7 +74,7 @@ class NTLMUpstreamAuth:
def build_connect_flow(context: Context, connect_header: tuple) -> http.HTTPFlow:
flow = extract_flow_from_context(context)
if not flow:
ctx.log.error("failed to build connect flow")
logging.error("failed to build connect flow")
raise
flow.request.content = b"" # we should send empty content for handshake
header_name, header_value = connect_header
@ -96,7 +97,7 @@ class NTLMUpstreamAuth:
try:
token = challenge_message.split(': ')[1]
except IndexError:
ctx.log.error("Failed to extract challenge_message")
logging.error("Failed to extract challenge_message")
raise
return token
@ -130,7 +131,7 @@ class NTLMUpstreamAuth:
HttpUpstreamProxy.receive_handshake_data = patched_receive_handshake_data
def done(self):
ctx.log.info('close ntlm session')
logging.info('close ntlm session')
addons = [
@ -149,8 +150,7 @@ class CustomNTLMContext:
ntlm_compatibility: int = ctx.options.upstream_ntlm_compatibility
username, password = tuple(auth.split(":"))
workstation = socket.gethostname().upper()
ctx.log.debug(f'\nntlm context with the details: "{domain}\\{username}", *****')
self.ctx_log = ctx.log
logging.debug(f'\nntlm context with the details: "{domain}\\{username}", *****')
self.preferred_type = preferred_type
self.ntlm_context = ntlm.NtlmContext(
username=username,
@ -165,7 +165,7 @@ class CustomNTLMContext:
negotiate_message_base_64_in_bytes = base64.b64encode(negotiate_message)
negotiate_message_base_64_ascii = negotiate_message_base_64_in_bytes.decode("ascii")
negotiate_message_base_64_final = f'{self.preferred_type} {negotiate_message_base_64_ascii}'
self.ctx_log.debug(
logging.debug(
f'{self.preferred_type} Authentication, negotiate message: {negotiate_message_base_64_final}'
)
return negotiate_message_base_64_final
@ -175,12 +175,12 @@ class CustomNTLMContext:
try:
challenge_message_ascii_bytes = base64.b64decode(challenge_message, validate=True)
except binascii.Error as err:
self.ctx_log.debug(f'{self.preferred_type} Authentication fail with error {err.__str__()}')
logging.debug(f'{self.preferred_type} Authentication fail with error {err.__str__()}')
return False
authenticate_message = self.ntlm_context.step(challenge_message_ascii_bytes)
negotiate_message_base_64 = '{} {}'.format(self.preferred_type,
base64.b64encode(authenticate_message).decode('ascii'))
self.ctx_log.debug(
logging.debug(
f'{self.preferred_type} Authentication, response to challenge message: {negotiate_message_base_64}'
)
return negotiate_message_base_64

View File

@ -18,16 +18,16 @@ for associating a file with its corresponding flow in the stream saved with
This addon is not compatible with addons that use the same mechanism to
capture streamed data, http-stream-modify.py for instance.
"""
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
from mitmproxy import ctx
from datetime import datetime
from pathlib import Path
import os
class StreamSaver:
TAG = "save_streamed_data: "
def __init__(self, flow, direction):
@ -58,7 +58,8 @@ class StreamSaver:
return data
if not self.fh:
self.path = datetime.fromtimestamp(self.flow.request.timestamp_start).strftime(ctx.options.save_streamed_data)
self.path = datetime.fromtimestamp(self.flow.request.timestamp_start).strftime(
ctx.options.save_streamed_data)
self.path = self.path.replace('%+T', str(self.flow.request.timestamp_start))
self.path = self.path.replace('%+I', str(self.flow.client_conn.id))
self.path = self.path.replace('%+D', self.direction)
@ -70,18 +71,18 @@ class StreamSaver:
if not parent.exists():
parent.mkdir(parents=True, exist_ok=True)
except OSError:
ctx.log.error(f"{self.TAG}Failed to create directory: {parent}")
logging.error(f"{self.TAG}Failed to create directory: {parent}")
try:
self.fh = open(self.path, "wb", buffering=0)
except OSError:
ctx.log.error(f"{self.TAG}Failed to open for writing: {self.path}")
logging.error(f"{self.TAG}Failed to open for writing: {self.path}")
if self.fh:
try:
self.fh.write(data)
except OSError:
ctx.log.error(f"{self.TAG}Failed to write to: {self.path}")
logging.error(f"{self.TAG}Failed to write to: {self.path}")
return data

View File

@ -1,10 +1,9 @@
import logging
import re
from collections.abc import Sequence
from json import dumps
from mitmproxy import command, ctx, flow
from mitmproxy import command, flow
MARKER = ':mag:'
RESULTS_STR = 'Search Results: '
@ -44,7 +43,7 @@ class Search:
try:
self.exp = re.compile(regex)
except re.error as e:
ctx.log.error(e)
logging.error(e)
return
for _flow in flows:

View File

@ -293,22 +293,6 @@ class TestXSSScanner():
assert xss_info == expected_xss_info
assert sqli_info is None
@pytest.fixture(scope='function')
def logger(self, monkeypatch):
class Logger():
def __init__(self):
self.args = []
def info(self, str):
self.args.append(str)
def error(self, str):
self.args.append(str)
logger = Logger()
monkeypatch.setattr("mitmproxy.ctx.log", logger)
yield logger
@pytest.fixture(scope='function')
def get_request_vuln(self, monkeypatch):
monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln)

View File

@ -15,6 +15,7 @@ Example:
// works again, but mitmproxy does not intercept and we do *not* see the contents
"""
import collections
import logging
import random
from abc import ABC, abstractmethod
from enum import Enum
@ -90,18 +91,18 @@ class MaybeTls:
def tls_clienthello(self, data: tls.ClientHelloData):
server_address = data.context.server.peername
if not self.strategy.should_intercept(server_address):
ctx.log(f"TLS passthrough: {human.format_address(server_address)}.")
logging.info(f"TLS passthrough: {human.format_address(server_address)}.")
data.ignore_connection = True
self.strategy.record_skipped(server_address)
def tls_established_client(self, data: tls.TlsData):
server_address = data.context.server.peername
ctx.log(f"TLS handshake successful: {human.format_address(server_address)}")
logging.info(f"TLS handshake successful: {human.format_address(server_address)}")
self.strategy.record_success(server_address)
def tls_failed_client(self, data: tls.TlsData):
server_address = data.context.server.peername
ctx.log(f"TLS handshake failed: {human.format_address(server_address)}")
logging.info(f"TLS handshake failed: {human.format_address(server_address)}")
self.strategy.record_failure(server_address)

View File

@ -34,18 +34,16 @@ Suggested Exploit: <script>alert(0)</script>
Line: 1029zxcs'd"ao<ac>so[sb]po(pc)se;sl/bsl\eq=3847asd
"""
import logging
import re
import socket
from html.parser import HTMLParser
from typing import NamedTuple, Optional, Union
from urllib.parse import urlparse
import re
import socket
import requests
from mitmproxy import http
from mitmproxy import ctx
# The actual payload is put between a frontWall and a backWall to make it easy
# to locate the payload with regular expressions
@ -92,6 +90,7 @@ def get_cookies(flow: http.HTTPFlow) -> Cookies:
def find_unclaimed_URLs(body, requestUrl):
""" Look for unclaimed URLs in script tags and log them if found"""
def getValue(attrs: list[tuple[str, str]], attrName: str) -> Optional[str]:
for name, value in attrs:
if attrName == name:
@ -115,7 +114,7 @@ def find_unclaimed_URLs(body, requestUrl):
try:
socket.gethostbyname(domain)
except socket.gaierror:
ctx.log.error(f"XSS found in {requestUrl} due to unclaimed URL \"{url}\".")
logging.error(f"XSS found in {requestUrl} due to unclaimed URL \"{url}\".")
def test_end_of_URL_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:
@ -171,22 +170,22 @@ def log_XSS_data(xss_info: Optional[XSSData]) -> None:
# If it is None, then there is no info to log
if not xss_info:
return
ctx.log.error("===== XSS Found ====")
ctx.log.error("XSS URL: %s" % xss_info.url)
ctx.log.error("Injection Point: %s" % xss_info.injection_point)
ctx.log.error("Suggested Exploit: %s" % xss_info.exploit)
ctx.log.error("Line: %s" % xss_info.line)
logging.error("===== XSS Found ====")
logging.error("XSS URL: %s" % xss_info.url)
logging.error("Injection Point: %s" % xss_info.injection_point)
logging.error("Suggested Exploit: %s" % xss_info.exploit)
logging.error("Line: %s" % xss_info.line)
def log_SQLi_data(sqli_info: Optional[SQLiData]) -> None:
""" Log information about the given SQLi to mitmproxy """
if not sqli_info:
return
ctx.log.error("===== SQLi Found =====")
ctx.log.error("SQLi URL: %s" % sqli_info.url)
ctx.log.error("Injection Point: %s" % sqli_info.injection_point)
ctx.log.error("Regex used: %s" % sqli_info.regex)
ctx.log.error("Suspected DBMS: %s" % sqli_info.dbms)
logging.error("===== SQLi Found =====")
logging.error("SQLi URL: %s" % sqli_info.url)
logging.error("Injection Point: %s" % sqli_info.injection_point)
logging.error("Regex used: %s" % sqli_info.regex)
logging.error("Suspected DBMS: %s" % sqli_info.dbms)
return
@ -277,6 +276,7 @@ def paths_to_text(html: str, string: str) -> list[str]:
def get_XSS_data(body: Union[str, bytes], request_URL: str, injection_point: str) -> Optional[XSSData]:
""" Return a XSSDict if there is a XSS otherwise return None """
def in_script(text, index, body) -> bool:
""" Whether the Numberth occurrence of the first string in the second
string is inside a script tag """
@ -302,6 +302,7 @@ def get_XSS_data(body: Union[str, bytes], request_URL: str, injection_point: str
def inject_javascript_handler(html: str) -> bool:
""" Whether you can inject a Javascript:alert(0) as a link """
class injectJSHandlerHTMLParser(HTMLParser):
injectJSHandler = False
@ -313,6 +314,7 @@ def get_XSS_data(body: Union[str, bytes], request_URL: str, injection_point: str
parser = injectJSHandlerHTMLParser()
parser.feed(html)
return parser.injectJSHandler
# Only convert the body to bytes if needed
if isinstance(body, str):
body = bytes(body, 'utf-8')

View File

@ -1,17 +1,20 @@
import contextlib
import inspect
import logging
import pprint
import sys
import traceback
import types
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from typing import Any, Optional
from mitmproxy import hooks
import sys
from mitmproxy import exceptions
from mitmproxy import flow
from . import ctx
from mitmproxy import hooks
logger = logging.getLogger(__name__)
def _get_name(itm):
@ -48,7 +51,7 @@ def safecall():
etype, value, tb = sys.exc_info()
tb = cut_traceback(tb, "invoke_addon_sync")
tb = cut_traceback(tb, "invoke_addon")
ctx.log.error(
logger.error(
"Addon error: %s" % "".join(traceback.format_exception(etype, value, tb))
)
@ -88,7 +91,7 @@ class Loader:
if same_signature:
return
else:
ctx.log.warn("Over-riding existing option %s" % name)
logger.warning("Over-riding existing option %s" % name)
self.master.options.add_option(name, typespec, default, help, choices)
def add_command(self, path: str, func: Callable) -> None:
@ -164,13 +167,19 @@ class AddonManager:
"clientdisconnect": "client_disconnected",
"serverconnect": "server_connect and server_connected",
"serverdisconnect": "server_disconnected",
# mitmproxy 8 -> mitmproxy 9
"add_log": None,
}
for a in traverse([addon]):
for old, new in api_changes.items():
if hasattr(a, old):
ctx.log.warn(
f"The {old} event has been removed, use {new} instead. "
f"For more details, see https://docs.mitmproxy.org/stable/addons-events/."
if new:
msg = f"The {old} event has been removed, use {new} instead. "
else:
msg = f"The {old} event has been removed. "
logger.warning(
f"{msg}"
f"For more details, see https://docs.mitmproxy.org/dev/addons-api-changelog/."
)
name = _get_name(a)
if name in self.lookup:

View File

@ -1,11 +1,15 @@
import asyncio
import logging
import traceback
import urllib.parse
import asgiref.compatibility
import asgiref.wsgi
from mitmproxy import ctx, http
logger = logging.getLogger(__name__)
class ASGIApp:
"""
@ -132,7 +136,7 @@ async def serve(app, flow: http.HTTPFlow):
if not sent_response:
raise RuntimeError(f"no response sent.")
except Exception:
ctx.log.error(f"Error in asgi app:\n{traceback.format_exc(limit=-5)}")
logger.error(f"Error in asgi app:\n{traceback.format_exc(limit=-5)}")
flow.response = http.Response.make(500, b"ASGI Error.")
finally:
done.set()

View File

@ -1,4 +1,6 @@
import ipaddress
import logging
from mitmproxy import ctx
@ -33,13 +35,13 @@ class Block:
return
if ctx.options.block_private and address.is_private:
ctx.log.warn(
logging.warning(
f"Client connection from {client.peername[0]} killed by block_private option."
)
client.error = "Connection killed by block_private."
if ctx.options.block_global and address.is_global:
ctx.log.warn(
logging.warning(
f"Client connection from {client.peername[0]} killed by block_global option."
)
client.error = "Connection killed by block_global."

View File

@ -1,3 +1,4 @@
import logging
import shutil
import subprocess
import tempfile
@ -5,6 +6,7 @@ from typing import Optional
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy.log import ALERT
def get_chrome_executable() -> Optional[str]:
@ -68,11 +70,11 @@ class Browser:
running proxy.
"""
if len(self.browser) > 0:
ctx.log.alert("Starting additional browser")
logging.log(ALERT, "Starting additional browser")
cmd = get_browser_cmd()
if not cmd:
ctx.log.alert("Your platform is not supported yet - please submit a patch.")
logging.log(ALERT, "Your platform is not supported yet - please submit a patch.")
return
tdir = tempfile.TemporaryDirectory()

View File

@ -1,9 +1,11 @@
import asyncio
import time
import logging
import traceback
from collections.abc import Sequence
from typing import Optional, cast
import time
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx
@ -11,16 +13,19 @@ from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import io
from mitmproxy.hooks import UpdateHook
from mitmproxy.options import Options
from mitmproxy.proxy.context import Context
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy import commands, events, layers, server
from mitmproxy.connection import ConnectionState, Server
from mitmproxy.hooks import UpdateHook
from mitmproxy.log import ALERT
from mitmproxy.options import Options
from mitmproxy.proxy import commands, events, layers, server
from mitmproxy.proxy.context import Context
from mitmproxy.proxy.layer import CommandGenerator
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy.mode_specs import UpstreamMode
from mitmproxy.utils import asyncio_utils
logger = logging.getLogger(__name__)
class MockServer(layers.http.HttpConnection):
"""
@ -68,7 +73,7 @@ class MockServer(layers.http.HttpConnection):
):
pass
else: # pragma: no cover
ctx.log(f"Unexpected event during replay: {event}")
logger.warning(f"Unexpected event during replay: {event}")
class ReplayHandler(server.ConnectionHandler):
@ -100,8 +105,9 @@ class ReplayHandler(server.ConnectionHandler):
self.server_event(events.Start())
await self.done.wait()
def log(self, message: str, level: str = "info") -> None:
ctx.log(f"[replay] {message}", level)
def log(self, message: str, level: int = logging.INFO) -> None:
assert isinstance(level, int)
logger.log(level=level, msg=f"[replay] {message}")
async def handle_hook(self, hook: commands.StartHook) -> None:
(data,) = hook.args()
@ -154,8 +160,8 @@ class ClientPlayback:
else:
await h.replay()
except Exception:
ctx.log(
f"Client replay has crashed!\n{traceback.format_exc()}", "error"
logger.error(
f"Client replay has crashed!\n{traceback.format_exc()}"
)
self.queue.task_done()
self.inflight = None
@ -228,7 +234,7 @@ class ClientPlayback:
updated.append(f)
ctx.master.addons.trigger(UpdateHook(updated))
ctx.log.alert("Client replay queue cleared.")
logger.log(ALERT, "Client replay queue cleared.")
@command.command("replay.client")
def start_replay(self, flows: Sequence[flow.Flow]) -> None:
@ -239,7 +245,7 @@ class ClientPlayback:
for f in flows:
err = self.check(f)
if err:
ctx.log.warn(err)
logger.warning(err)
continue
http_flow = cast(http.HTTPFlow, f)

View File

@ -1,3 +1,4 @@
import logging
import os
import pathlib
from collections.abc import Sequence
@ -40,11 +41,11 @@ class CommandHistory:
def done(self):
if ctx.options.command_history and len(self.history) >= self.VACUUM_SIZE:
# vacuum history so that it doesn't grow indefinitely.
history_str = "\n".join(self.history[-self.VACUUM_SIZE // 2 :]) + "\n"
history_str = "\n".join(self.history[-self.VACUUM_SIZE // 2:]) + "\n"
try:
self.history_file.write_text(history_str)
except Exception as e:
ctx.log.alert(f"Failed writing to {self.history_file}: {e}")
logging.warning(f"Failed writing to {self.history_file}: {e}")
@command.command("commands.history.add")
def add_command(self, command: str) -> None:
@ -57,7 +58,7 @@ class CommandHistory:
with self.history_file.open("a") as f:
f.write(f"{command}\n")
except Exception as e:
ctx.log.alert(f"Failed writing to {self.history_file}: {e}")
logging.warning(f"Failed writing to {self.history_file}: {e}")
self.set_filter("")
@ -72,7 +73,7 @@ class CommandHistory:
try:
self.history_file.unlink()
except Exception as e:
ctx.log.alert(f"Failed deleting {self.history_file}: {e}")
logging.warning(f"Failed deleting {self.history_file}: {e}")
self.history = []
self.set_filter("")

View File

@ -1,7 +1,9 @@
import logging
import os
from collections.abc import Sequence
from typing import Union
from mitmproxy.log import ALERT
from mitmproxy.utils import emoji
from mitmproxy import ctx, hooks
from mitmproxy import exceptions
@ -12,6 +14,8 @@ from mitmproxy.net.http import status_codes
import mitmproxy.types
logger = logging.getLogger(__name__)
CONF_DIR = "~/.mitmproxy"
LISTEN_PORT = 8080
@ -96,7 +100,7 @@ class Core:
if f.killable:
f.kill()
updated.append(f)
ctx.log.alert("Killed %s flows." % len(updated))
logger.log(ALERT, "Killed %s flows." % len(updated))
ctx.master.addons.trigger(hooks.UpdateHook(updated))
# FIXME: this will become view.revert later
@ -110,7 +114,7 @@ class Core:
if f.modified():
f.revert()
updated.append(f)
ctx.log.alert("Reverted %s flows." % len(updated))
logger.log(ALERT, "Reverted %s flows." % len(updated))
ctx.master.addons.trigger(hooks.UpdateHook(updated))
@command.command("flow.set.options")
@ -176,7 +180,7 @@ class Core:
updated.append(f)
ctx.master.addons.trigger(hooks.UpdateHook(updated))
ctx.log.alert(f"Set {attr} on {len(updated)} flows.")
logger.log(ALERT, f"Set {attr} on {len(updated)} flows.")
@command.command("flow.decode")
def decode(self, flows: Sequence[flow.Flow], part: str) -> None:
@ -191,7 +195,7 @@ class Core:
p.decode()
updated.append(f)
ctx.master.addons.trigger(hooks.UpdateHook(updated))
ctx.log.alert("Decoded %s flows." % len(updated))
logger.log(ALERT, "Decoded %s flows." % len(updated))
@command.command("flow.encode.toggle")
def encode_toggle(self, flows: Sequence[flow.Flow], part: str) -> None:
@ -210,7 +214,7 @@ class Core:
p.decode()
updated.append(f)
ctx.master.addons.trigger(hooks.UpdateHook(updated))
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
logger.log(ALERT, "Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
@command.argument("encoding", type=mitmproxy.types.Choice("flow.encode.options"))
@ -233,7 +237,7 @@ class Core:
p.encode(encoding)
updated.append(f)
ctx.master.addons.trigger(hooks.UpdateHook(updated))
ctx.log.alert("Encoded %s flows." % len(updated))
logger.log(ALERT, "Encoded %s flows." % len(updated))
@command.command("flow.encode.options")
def encode_options(self) -> Sequence[str]:

View File

@ -1,5 +1,6 @@
import io
import csv
import logging
import os.path
from collections.abc import Sequence
from typing import Any, Union
@ -7,12 +8,15 @@ from typing import Any, Union
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import ctx
from mitmproxy import certs
import mitmproxy.types
import pyperclip
from mitmproxy.log import ALERT
logger = logging.getLogger(__name__)
def headername(spec: str):
if not (spec.startswith("header[") and spec.endswith("]")):
@ -117,7 +121,7 @@ class Cut:
fp.write(v)
else:
fp.write(v.encode("utf8"))
ctx.log.alert("Saved single cut.")
logger.log(ALERT, "Saved single cut.")
else:
with open(
path, "a" if append else "w", newline="", encoding="utf8"
@ -126,11 +130,12 @@ class Cut:
for f in flows:
vals = [extract_str(c, f) for c in cuts]
writer.writerow(vals)
ctx.log.alert(
logger.log(
ALERT,
"Saved %s cuts over %d flows as CSV." % (len(cuts), len(flows))
)
except OSError as e:
ctx.log.error(str(e))
logger.error(str(e))
@command.command("cut.clip")
def clip(
@ -148,14 +153,14 @@ class Cut:
if len(cuts) == 1 and len(flows) == 1:
v = extract_str(cuts[0], flows[0])
fp.write(v)
ctx.log.alert("Clipped single cut.")
logger.log(ALERT, "Clipped single cut.")
else:
writer = csv.writer(fp)
for f in flows:
vals = [extract_str(c, f) for c in cuts]
writer.writerow(vals)
ctx.log.alert("Clipped %s cuts as CSV." % len(cuts))
logger.log(ALERT, "Clipped %s cuts as CSV." % len(cuts))
try:
pyperclip.copy(fp.getvalue())
except pyperclip.PyperclipException as e:
ctx.log.error(str(e))
logger.error(str(e))

View File

@ -1,4 +1,4 @@
import mitmproxy
import logging
class DisableH2C:
@ -16,7 +16,7 @@ class DisableH2C:
def process_flow(self, f):
if f.request.headers.get("upgrade", "") == "h2c":
mitmproxy.ctx.log.warn(
logging.warning(
"HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported."
)
del f.request.headers["upgrade"]
@ -32,7 +32,7 @@ class DisableH2C:
)
if is_connection_preface:
f.kill()
mitmproxy.ctx.log.warn(
logging.warning(
"Initiating HTTP/2 connections with prior knowledge are currently not supported."
)

View File

@ -1,3 +1,5 @@
import logging
import itertools
import shutil
import sys
@ -114,7 +116,7 @@ class Dumper:
ctx.options.dumper_default_contentview, message, flow
)
if error:
ctx.log.debug(error)
logging.debug(error)
if ctx.options.flow_detail == 3:
lines_to_echo = itertools.islice(lines, 70)

View File

@ -1,4 +1,6 @@
import asyncio
import logging
import sys
from mitmproxy import log
@ -8,20 +10,30 @@ class ErrorCheck:
"""Monitor startup for error log entries, and terminate immediately if there are some."""
def __init__(self, log_to_stderr: bool = False):
self.has_errored: list[str] = []
self.log_to_stderr = log_to_stderr
def add_log(self, e: log.LogEntry):
if e.level == "error":
self.has_errored.append(e.msg)
self.logger = ErrorCheckHandler()
self.logger.install()
def finish(self):
self.logger.uninstall()
async def shutdown_if_errored(self):
# don't run immediately, wait for all logging tasks to finish.
await asyncio.sleep(0)
if self.has_errored:
if self.logger.has_errored:
if self.log_to_stderr:
plural = "s" if len(self.has_errored) > 1 else ""
msg = "\n".join(self.has_errored)
plural = "s" if len(self.logger.has_errored) > 1 else ""
msg = "\n".join(r.msg for r in self.logger.has_errored)
print(f"Error{plural} on startup: {msg}", file=sys.stderr)
sys.exit(1)
class ErrorCheckHandler(log.MitmLogHandler):
def __init__(self):
super().__init__(logging.ERROR)
self.has_errored: list[logging.LogRecord] = []
def emit(self, record: logging.LogRecord) -> None:
self.has_errored.append(record)

View File

@ -1,7 +1,10 @@
import asyncio
import collections
import logging
from collections.abc import Callable
from typing import Optional
from mitmproxy import command
from mitmproxy import command, log
from mitmproxy.log import LogEntry
from mitmproxy.utils import signals
@ -12,14 +15,20 @@ class EventStore:
self.sig_add = signals.SyncSignal(lambda entry: None)
self.sig_refresh = signals.SyncSignal(lambda: None)
self.logger = CallbackLogger(self._add_log)
self.logger.install()
def done(self):
self.logger.uninstall()
def _add_log(self, entry: LogEntry) -> None:
self.data.append(entry)
self.sig_add.send(entry)
@property
def size(self) -> Optional[int]:
return self.data.maxlen
def add_log(self, entry: LogEntry) -> None:
self.data.append(entry)
self.sig_add.send(entry)
@command.command("eventstore.clear")
def clear(self) -> None:
"""
@ -27,3 +36,30 @@ class EventStore:
"""
self.data.clear()
self.sig_refresh.send()
LOGGING_LEVELS_TO_LOGENTRY = {
logging.ERROR: "error",
logging.WARNING: "warn",
logging.INFO: "info",
log.ALERT: "alert",
logging.DEBUG: "debug",
}
class CallbackLogger(log.MitmLogHandler):
def __init__(
self,
callback: Callable[[LogEntry], None],
):
super().__init__()
self.callback = callback
self.event_loop = asyncio.get_running_loop()
self.formatter = log.MitmFormatter(colorize=False)
def emit(self, record: logging.LogRecord) -> None:
entry = LogEntry(
msg=self.format(record),
level=LOGGING_LEVELS_TO_LOGENTRY.get(record.levelno, "error"),
)
self.event_loop.call_soon_threadsafe(self.callback, entry)

View File

@ -1,3 +1,4 @@
import logging
import shlex
from collections.abc import Callable, Sequence
from typing import Any, Union
@ -185,7 +186,7 @@ class Export:
else:
fp.write(v.encode("utf-8"))
except OSError as e:
ctx.log.error(str(e))
logging.error(str(e))
@command.command("export.clip")
def clip(self, format: str, f: flow.Flow) -> None:
@ -195,7 +196,7 @@ class Export:
try:
pyperclip.copy(self.export_str(format, f))
except pyperclip.PyperclipException as e:
ctx.log.error(str(e))
logging.error(str(e))
@command.command("export")
def export_str(self, format: str, f: flow.Flow) -> str:

View File

@ -1,3 +1,4 @@
import logging
import mimetypes
import re
import urllib.parse
@ -133,7 +134,7 @@ class MapLocal:
try:
contents = local_file.read_bytes()
except OSError as e:
ctx.log.warn(f"Could not read file: {e}")
logging.warning(f"Could not read file: {e}")
continue
flow.response = http.Response.make(200, contents, headers)
@ -141,6 +142,6 @@ class MapLocal:
return
if all_candidates:
flow.response = http.Response.make(404)
ctx.log.info(
logging.info(
f"None of the local file candidates exist: {', '.join(str(x) for x in all_candidates)}"
)

View File

@ -1,3 +1,4 @@
import logging
import re
from collections.abc import Sequence
@ -50,7 +51,7 @@ class ModifyBody:
try:
replacement = spec.read_replacement()
except OSError as e:
ctx.log.warn(f"Could not read replacement file: {e}")
logging.warning(f"Could not read replacement file: {e}")
continue
if flow.response:
flow.response.content = re.sub(

View File

@ -1,3 +1,4 @@
import logging
import re
from collections.abc import Sequence
from pathlib import Path
@ -106,7 +107,7 @@ class ModifyHeaders:
try:
replacement = spec.read_replacement()
except OSError as e:
ctx.log.warn(f"Could not read replacement file: {e}")
logging.warning(f"Could not read replacement file: {e}")
continue
else:
if replacement:

View File

@ -6,6 +6,7 @@ from __future__ import annotations
import asyncio
import collections
import ipaddress
import logging
from contextlib import contextmanager
from typing import Iterable, Iterator, Optional
@ -28,6 +29,8 @@ from mitmproxy.proxy.layers.websocket import WebSocketMessageInjected
from mitmproxy.proxy.mode_servers import ProxyConnectionHandler, ServerInstance, ServerManager
from mitmproxy.utils import human, signals
logger = logging.getLogger(__name__)
class Servers:
def __init__(self, manager: ServerManager):
@ -71,11 +74,11 @@ class Servers:
for ret in await asyncio.gather(*stop_tasks, return_exceptions=True):
if ret:
all_ok = False
ctx.log.error(str(ret))
logger.error(str(ret))
for ret in await asyncio.gather(*start_tasks, return_exceptions=True):
if ret:
all_ok = False
ctx.log.error(str(ret))
logger.error(str(ret))
await self.changed.send()
return all_ok
@ -245,7 +248,7 @@ class Proxyserver(ServerManager):
raise exceptions.OptionsError(f"Cannot spawn multiple servers on the same address: {dup_addr}")
if ctx.options.mode and not ctx.master.addons.get("nextlayer"):
ctx.log.warn("Warning: Running proxyserver without nextlayer addon!")
logger.warning("Warning: Running proxyserver without nextlayer addon!")
if any(isinstance(m, mode_specs.TransparentMode) for m in modes):
if platform.original_addr:
platform.init_transparent_mode()
@ -280,7 +283,7 @@ class Proxyserver(ServerManager):
self, flow: Flow, to_client: bool, message: bytes, is_text: bool = True
):
if not isinstance(flow, http.HTTPFlow) or not flow.websocket:
ctx.log.warn("Cannot inject WebSocket messages into non-WebSocket flows.")
logger.warning("Cannot inject WebSocket messages into non-WebSocket flows.")
msg = websocket.WebSocketMessage(
Opcode.TEXT if is_text else Opcode.BINARY, not to_client, message
@ -289,18 +292,18 @@ class Proxyserver(ServerManager):
try:
self.inject_event(event)
except ValueError as e:
ctx.log.warn(str(e))
logger.warning(str(e))
@command.command("inject.tcp")
def inject_tcp(self, flow: Flow, to_client: bool, message: bytes):
if not isinstance(flow, tcp.TCPFlow):
ctx.log.warn("Cannot inject TCP messages into non-TCP flows.")
logger.warning("Cannot inject TCP messages into non-TCP flows.")
event = TcpMessageInjected(flow, tcp.TCPMessage(not to_client, message))
try:
self.inject_event(event)
except ValueError as e:
ctx.log.warn(str(e))
logger.warning(str(e))
def server_connect(self, data: server_hooks.ServerConnectionHookData):
if data.server.sockname is None:

View File

@ -1,4 +1,5 @@
import asyncio
import logging
import os.path
import sys
from typing import BinaryIO, Optional
@ -46,9 +47,9 @@ class ReadFile:
cnt += 1
except (OSError, exceptions.FlowReadException) as e:
if cnt:
ctx.log.warn("Flow file corrupted - loaded %i flows." % cnt)
logging.warning("Flow file corrupted - loaded %i flows." % cnt)
else:
ctx.log.error("Flow file corrupted.")
logging.error("Flow file corrupted.")
raise exceptions.FlowReadException(str(e)) from e
else:
return cnt
@ -59,7 +60,7 @@ class ReadFile:
with open(path, "rb") as f:
return await self.load_flows(f)
except OSError as e:
ctx.log.error(f"Cannot load flows: {e}")
logging.error(f"Cannot load flows: {e}")
raise exceptions.FlowReadException(str(e)) from e
async def doread(self, rfile):

View File

@ -1,3 +1,4 @@
import logging
import os.path
import sys
from collections.abc import Sequence
@ -15,6 +16,7 @@ from mitmproxy import flow
from mitmproxy import flowfilter
from mitmproxy import http
from mitmproxy import io
from mitmproxy.log import ALERT
@lru_cache
@ -136,7 +138,7 @@ class Save:
stream.add(i)
except OSError as e:
raise exceptions.CommandError(e) from e
ctx.log.alert(f"Saved {len(flows)} flows.")
logging.log(ALERT, f"Saved {len(flows)} flows.")
def tcp_start(self, flow: tcp.TCPFlow):
if self.stream:
@ -182,4 +184,4 @@ class Save:
self.save_flow(flow)
def dns_error(self, flow: dns.DNSFlow):
self.save_flow(flow)
self.save_flow(flow)

View File

@ -1,4 +1,5 @@
import asyncio
import logging
import os
import importlib.util
import importlib.machinery
@ -17,6 +18,8 @@ from mitmproxy import ctx
import mitmproxy.types as mtypes
from mitmproxy.utils import asyncio_utils
logger = logging.getLogger(__name__)
def load_script(path: str) -> Optional[types.ModuleType]:
fullname = "__mitmproxy_script__.{}".format(
@ -61,7 +64,7 @@ def script_error_handler(path, exc, msg="", tb=False):
log_msg = (
log_msg + "\n" + "".join(traceback.format_exception(etype, value, tback))
)
ctx.log.error(log_msg)
logger.error(log_msg)
ReloadInterval = 1
@ -103,7 +106,7 @@ class Script:
return [self.ns] if self.ns else []
def loadscript(self):
ctx.log.info("Loading script %s" % self.path)
logger.info("Loading script %s" % self.path)
if self.ns:
ctx.master.addons.remove(self.ns)
self.ns = None
@ -128,7 +131,7 @@ class Script:
try:
mtime = os.stat(self.fullpath).st_mtime
except FileNotFoundError:
ctx.log.info("Removing script %s" % self.path)
logger.info("Removing script %s" % self.path)
scripts = list(ctx.options.scripts)
scripts.remove(self.path)
ctx.options.update(scripts=scripts)
@ -162,7 +165,7 @@ class ScriptLoader:
simulated. Note that the load event is not invoked.
"""
if not os.path.isfile(path):
ctx.log.error("No such script: %s" % path)
logger.error("No such script: %s" % path)
return
mod = load_script(path)
if mod:
@ -184,7 +187,7 @@ class ScriptLoader:
for a in self.addons[:]:
if a.path not in ctx.options.scripts:
ctx.log.info("Un-loading script: %s" % a.path)
logger.info("Un-loading script: %s" % a.path)
ctx.master.addons.remove(a)
self.addons.remove(a)

View File

@ -1,4 +1,6 @@
from mitmproxy import ctx, http
import logging
from mitmproxy import http
class ServerSideEvents:
@ -15,7 +17,7 @@ class ServerSideEvents:
"text/event-stream"
)
if is_sse and not flow.response.stream:
ctx.log.warn(
logging.warning(
"mitmproxy currently does not support server side events. As a workaround, you can enable response "
"streaming for such flows: https://github.com/mitmproxy/mitmproxy/issues/4469"
)

View File

@ -1,4 +1,5 @@
import hashlib
import logging
import urllib
from collections.abc import Hashable, Sequence
from typing import Any, Optional
@ -228,7 +229,7 @@ class ServerPlayback:
f.response = response
f.is_replay = "response"
elif ctx.options.server_replay_kill_extra:
ctx.log.warn(
logging.warning(
"server_playback: killed non-replay request {}".format(
f.request.url
)

View File

@ -1,44 +1,56 @@
import asyncio
import logging
from typing import IO
import sys
from typing import IO, Optional
from mitmproxy import ctx
from mitmproxy import log
from mitmproxy.contrib import click as miniclick
from mitmproxy import ctx, log
from mitmproxy.utils import vt_codes
LOG_COLORS = {"error": "red", "warn": "yellow", "alert": "magenta"}
class TermLog:
def __init__(
self,
out: Optional[IO[str]] = None,
err: Optional[IO[str]] = None,
out: IO[str] | None = None
):
self.out_file: IO[str] = out or sys.stdout
self.out_has_vt_codes = vt_codes.ensure_supported(self.out_file)
self.err_file: IO[str] = err or sys.stderr
self.err_has_vt_codes = vt_codes.ensure_supported(self.err_file)
self.logger = TermLogHandler(out)
self.logger.install()
def load(self, loader):
loader.add_option(
"termlog_verbosity", str, "info", "Log verbosity.", choices=log.LogTierOrder
"termlog_verbosity", str, "info", "Log verbosity.", choices=log.LogLevels
)
self.logger.setLevel(logging.INFO)
def add_log(self, e: log.LogEntry):
if log.log_tier(ctx.options.termlog_verbosity) >= log.log_tier(e.level):
if e.level == "error":
f = self.err_file
has_vt_codes = self.err_has_vt_codes
else:
f = self.out_file
has_vt_codes = self.out_has_vt_codes
def configure(self, updated):
if "termlog_verbosity" in updated:
self.logger.setLevel(ctx.options.termlog_verbosity.upper())
msg = e.msg
if has_vt_codes:
msg = miniclick.style(
e.msg,
fg=LOG_COLORS.get(e.level),
dim=(e.level == "debug"),
)
print(msg, file=f)
def done(self):
t = self._teardown()
try:
# try to delay teardown a bit.
asyncio.create_task(t)
except RuntimeError:
# no event loop, we're in a test.
asyncio.run(t)
async def _teardown(self):
self.logger.uninstall()
class TermLogHandler(log.MitmLogHandler):
def __init__(
self,
out: IO[str] | None = None
):
super().__init__()
self.file: IO[str] = out or sys.stdout
self.has_vt_codes = vt_codes.ensure_supported(self.file)
self.formatter = log.MitmFormatter(self.has_vt_codes)
def emit(self, record: logging.LogRecord) -> None:
print(
self.format(record),
file=self.file
)

View File

@ -1,4 +1,5 @@
import ipaddress
import logging
import os
from pathlib import Path
from typing import Any, Optional, TypedDict
@ -316,7 +317,7 @@ class TlsConfig:
else None,
)
if self.certstore.default_ca.has_expired():
ctx.log.warn(
logging.warning(
"The mitmproxy certificate authority has expired!\n"
"Please delete all CA-related files in your ~/.mitmproxy folder.\n"
"The CA will be regenerated automatically after restarting mitmproxy.\n"

View File

@ -9,6 +9,7 @@ The View:
removed from the store.
"""
import collections
import logging
import re
from collections.abc import Iterator, MutableMapping, Sequence
from typing import Any, Optional
@ -27,6 +28,7 @@ from mitmproxy import http
from mitmproxy import io
from mitmproxy import tcp
from mitmproxy import udp
from mitmproxy.log import ALERT
from mitmproxy.utils import human, signals
@ -417,7 +419,7 @@ class View(collections.abc.Sequence):
if dups:
self.add(dups)
self.focus.flow = dups[0]
ctx.log.alert("Duplicated %s flows" % len(dups))
logging.log(ALERT, "Duplicated %s flows" % len(dups))
@command.command("view.flows.remove")
def remove(self, flows: Sequence[mitmproxy.flow.Flow]) -> None:
@ -437,7 +439,7 @@ class View(collections.abc.Sequence):
del self._store[f.id]
self.sig_store_remove.send(flow=f)
if len(flows) > 1:
ctx.log.alert("Removed %s flows" % len(flows))
logging.log(ALERT, "Removed %s flows" % len(flows))
@command.command("view.flows.resolve")
def resolve(self, flow_spec: str) -> Sequence[mitmproxy.flow.Flow]:
@ -494,9 +496,9 @@ class View(collections.abc.Sequence):
# .newid() method or something.
self.add([i.copy()])
except OSError as e:
ctx.log.error(e.strerror)
logging.error(e.strerror)
except exceptions.FlowReadException as e:
ctx.log.error(str(e))
logging.error(str(e))
def add(self, flows: Sequence[mitmproxy.flow.Flow]) -> None:
"""

View File

@ -3,6 +3,8 @@
"""
import functools
import inspect
import logging
import sys
import textwrap
import types
@ -177,7 +179,7 @@ class CommandManager:
try:
self.add(o.command_name, o)
except exceptions.CommandError as e:
self.master.log.warn(
logging.warning(
f"Could not load command {o.command_name}: {e}"
)

View File

@ -1,11 +1,12 @@
from __future__ import annotations
import logging
import struct
from dataclasses import dataclass, field
from enum import Enum
from typing import Generator, Iterable, Iterator
from mitmproxy import contentviews, ctx, flow, flowfilter, http
from mitmproxy import contentviews, flow, flowfilter, http
from mitmproxy.contentviews import base
from mitmproxy.net.encoding import decode
@ -258,8 +259,7 @@ class ProtoParser:
packed_field: ProtoParser.Field,
) -> list[ProtoParser.Field]:
if not isinstance(packed_field.wire_value, bytes):
ctx.log(type(packed_field.wire_value))
raise ValueError("can not unpack field with data other than bytes")
raise ValueError(f"can not unpack field with data other than bytes: {type(packed_field.wire_value)}")
wire_data: bytes = packed_field.wire_value
tag: int = packed_field.tag
options: ProtoParser.ParserOptions = packed_field.options
@ -526,7 +526,7 @@ class ProtoParser:
self.preferred_decoding = decoding
self.try_unpack = as_packed
except Exception as e:
ctx.log.warn(e)
logging.warning(e)
def _gen_tag_str(self):
tags = self.parent_tags[:]
@ -1087,7 +1087,7 @@ class ViewGrpcProtobuf(base.View):
# hook to log exception tracebacks on iterators
# import traceback
# ctx.log.warn("gRPC contentview: {}".format(traceback.format_exc()))
# logging.warning("gRPC contentview: {}".format(traceback.format_exc()))
raise e
return title, text_iter

View File

@ -1,7 +1,91 @@
from dataclasses import dataclass
import logging
import os
import warnings
from mitmproxy import hooks
from mitmproxy.contrib import click as miniclick
from mitmproxy.utils import human
ALERT = logging.INFO + 1
"""
The ALERT logging level has the same urgency as info, but
signals to interactive tools that the user's attention should be
drawn to the output even if they're not currently looking at the
event log.
"""
logging.addLevelName(ALERT, "ALERT")
LogLevels = [
"error",
"warn",
"info",
"alert",
"debug",
]
LOG_COLORS = {logging.ERROR: "red", logging.WARNING: "yellow", ALERT: "magenta"}
class MitmFormatter(logging.Formatter):
def __init__(self, colorize: bool):
super().__init__()
self.colorize = colorize
time = "[%s]"
client = "[%s]"
if colorize:
time = miniclick.style(time, fg="cyan", dim=True)
client = miniclick.style(client, fg="yellow", dim=True)
self.with_client = f"{time}{client} %s"
self.without_client = f"{time} %s"
default_time_format = "%H:%M:%S"
default_msec_format = '%s.%03d'
def format(self, record: logging.LogRecord) -> str:
time = self.formatTime(record)
message = record.getMessage()
if self.colorize:
message = miniclick.style(
message,
fg=LOG_COLORS.get(record.levelno),
dim=(record.levelno <= logging.DEBUG)
)
if client := getattr(record, "client", None):
client = human.format_address(client)
return self.with_client % (time, client, message)
else:
return self.without_client % (time, message)
class MitmLogHandler(logging.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._initiated_in_test = os.environ.get("PYTEST_CURRENT_TEST")
def filter(self, record: logging.LogRecord) -> bool:
# We can't remove stale handlers here because that would modify .handlers during iteration!
return (
super().filter(record)
and
(
not self._initiated_in_test
or self._initiated_in_test == os.environ.get("PYTEST_CURRENT_TEST")
)
)
def install(self) -> None:
if self._initiated_in_test:
for h in list(logging.getLogger().handlers):
if isinstance(h, MitmLogHandler) and h._initiated_in_test != self._initiated_in_test:
h.uninstall()
logging.getLogger().addHandler(self)
def uninstall(self) -> None:
logging.getLogger().removeHandler(self)
# everything below is deprecated!
class LogEntry:
def __init__(self, msg, level):
@ -22,6 +106,8 @@ class LogEntry:
class Log:
"""
The central logger, exposed to scripts as mitmproxy.ctx.log.
Deprecated: Please use the standard Python logging module instead.
"""
def __init__(self, master):
@ -31,13 +117,23 @@ class Log:
"""
Log with level debug.
"""
self(txt, "debug")
warnings.warn(
"mitmproxy's ctx.log.debug() is deprecated. Please use the standard Python logging module instead.",
DeprecationWarning,
stacklevel=2,
)
logging.getLogger().debug(txt)
def info(self, txt):
"""
Log with level info.
"""
self(txt, "info")
warnings.warn(
"mitmproxy's ctx.log.info() is deprecated. Please use the standard Python logging module instead.",
DeprecationWarning,
stacklevel=2,
)
logging.getLogger().info(txt)
def alert(self, txt):
"""
@ -46,46 +142,47 @@ class Log:
drawn to the output even if they're not currently looking at the
event log.
"""
self(txt, "alert")
warnings.warn(
"mitmproxy's ctx.log.alert() is deprecated. Please use the standard Python logging module instead.",
DeprecationWarning,
stacklevel=2,
)
logging.getLogger().log(ALERT, txt)
def warn(self, txt):
"""
Log with level warn.
"""
self(txt, "warn")
warnings.warn(
"mitmproxy's ctx.log.warn() is deprecated. Please use the standard Python logging module instead.",
DeprecationWarning,
stacklevel=2,
)
logging.getLogger().warning(txt)
def error(self, txt):
"""
Log with level error.
"""
self(txt, "error")
warnings.warn(
"mitmproxy's ctx.log.error() is deprecated. Please use the standard Python logging module instead.",
DeprecationWarning,
stacklevel=2,
)
logging.getLogger().error(txt)
def __call__(self, text, level="info"):
self.master.event_loop.call_soon_threadsafe(
self.master.addons.trigger,
AddLogHook(LogEntry(text, level)),
warnings.warn(
"mitmproxy's ctx.log() is deprecated. Please use the standard Python logging module instead.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class AddLogHook(hooks.Hook):
"""
Called whenever a new log entry is created through the mitmproxy
context. Be careful not to log from this event, which will cause an
infinite loop!
"""
entry: LogEntry
LogTierOrder = [
"error",
"warn",
"info",
"alert",
"debug",
]
logging.getLogger().log(level=logging.getLevelName(level.upper()), msg=text)
def log_tier(level):
"""
Comparison method for "old" LogEntry log tiers.
Ideally you should use the standard Python logging module instead.
"""
return dict(error=0, warn=1, info=2, alert=2, debug=3).get(level)

View File

@ -1,4 +1,5 @@
import asyncio
import logging
import traceback
from typing import Optional
@ -11,6 +12,8 @@ from mitmproxy import options
from . import ctx as mitmproxy_ctx
from .proxy.mode_specs import ReverseMode
logger = logging.getLogger(__name__)
class Master:
"""
@ -43,12 +46,13 @@ class Master:
try:
self.should_exit.clear()
# Handle scheduled tasks (configure()) first.
await asyncio.sleep(0)
if ec := self.addons.get("errorcheck"):
await ec.shutdown_if_errored()
if ps := self.addons.get("proxyserver"):
await ps.setup_servers()
if ec := self.addons.get("errorcheck"):
await ec.shutdown_if_errored()
ec.finish()
await self.running()
try:
await self.should_exit.wait()
@ -75,7 +79,7 @@ class Master:
try:
exc: Exception = context["exception"]
except KeyError:
self.log.error(
logger.error(
f"Unhandled asyncio error: {context}"
"\nPlease lodge a bug report at:"
+ "\n\thttps://github.com/mitmproxy/mitmproxy/issues"
@ -83,7 +87,7 @@ class Master:
else:
if isinstance(exc, OSError) and exc.errno == 10038:
return # suppress https://bugs.python.org/issue43253
self.log.error(
logger.error(
"\n".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
+ "\nPlease lodge a bug report at:"
+ "\n\thttps://github.com/mitmproxy/mitmproxy/issues"

View File

@ -1,12 +1,13 @@
from __future__ import annotations
import asyncio
import logging
import socket
from typing import Any, Callable, Optional, Union, cast
from mitmproxy import ctx
from mitmproxy.connection import Address
from mitmproxy.utils import human
logger = logging.getLogger(__name__)
MAX_DATAGRAM_SIZE = 65535 - 20
@ -54,7 +55,7 @@ class DrainableDatagramProtocol(asyncio.DatagramProtocol):
def connection_lost(self, exc: Exception | None) -> None:
self._closed.set()
if exc:
ctx.log.warn(f"Connection lost on {self!r}: {exc!r}") # pragma: no cover
logger.warning(f"Connection lost on {self!r}: {exc!r}") # pragma: no cover
def pause_writing(self) -> None:
self._paused = self._paused + 1
@ -71,7 +72,7 @@ class DrainableDatagramProtocol(asyncio.DatagramProtocol):
await self._can_write.wait()
def error_received(self, exc: Exception) -> None:
ctx.log.warn(f"Send/receive on {self!r} failed: {exc!r}") # pragma: no cover
logger.warning(f"Send/receive on {self!r} failed: {exc!r}") # pragma: no cover
async def wait_closed(self) -> None:
await self._closed.wait()
@ -122,14 +123,14 @@ class DatagramReader:
def feed_data(self, data: bytes, remote_addr: Address) -> None:
assert len(data) <= MAX_DATAGRAM_SIZE
if self._eof:
ctx.log.info(
logger.info(
f"Received UDP packet from {human.format_address(remote_addr)} after EOF."
)
else:
try:
self._packets.put_nowait(data)
except asyncio.QueueFull:
ctx.log.debug(
logger.debug(
f"Dropped UDP packet from {human.format_address(remote_addr)}."
)

View File

@ -6,7 +6,9 @@ possibly to the master and addons.
The counterpart to commands are events.
"""
from typing import Literal, Union, TYPE_CHECKING
import logging
import warnings
from typing import Union, TYPE_CHECKING
import mitmproxy.hooks
from mitmproxy.connection import Connection, Server
@ -120,16 +122,31 @@ class StartHook(Command, mitmproxy.hooks.Hook):
class Log(Command):
"""
Log a message.
Layers could technically call `logging.log` directly, but the use of a command allows us to
write more expressive playbook tests. Put differently, by using commands we can assert that
a specific log message is a direct consequence of a particular I/O event.
This could also be implemented with some more playbook magic in the future,
but for now we keep the current approach as the fully sans-io one.
"""
message: str
level: str
level: int
def __init__(
self,
message: str,
level: Literal["error", "warn", "info", "alert", "debug"] = "info",
level: int = logging.INFO,
):
if isinstance(level, str): # pragma: no cover
warnings.warn(
"commands.Log() now expects an integer log level, not a string.",
DeprecationWarning, stacklevel=2
)
level = getattr(logging, level.upper())
self.message = message
self.level = level
def __repr__(self):
return f"Log({self.message!r}, {self.level!r})"
return f"Log({self.message!r}, {logging.getLevelName(self.level).lower()})"

View File

@ -5,6 +5,7 @@ import collections
import textwrap
from abc import abstractmethod
from dataclasses import dataclass
from logging import DEBUG
from typing import Any, ClassVar, Generator, NamedTuple, Optional, TypeVar
from mitmproxy.connection import Connection
@ -97,7 +98,7 @@ class Layer:
message = message[:256] + ""
else:
Layer.__last_debug_message = message
return commands.Log(textwrap.indent(message, self.debug), "debug")
return commands.Log(textwrap.indent(message, self.debug), DEBUG)
@property
def stack_pos(self) -> str:
@ -284,7 +285,7 @@ class NextLayer(Layer):
# Has an addon decided on the next layer yet?
if self.layer:
if self.debug:
yield commands.Log(f"{self.debug}[nextlayer] {self.layer!r}", "debug")
yield commands.Log(f"{self.debug}[nextlayer] {self.layer!r}", DEBUG)
for e in self.events:
yield from self.layer.handle_event(e)
self.events.clear()

View File

@ -86,6 +86,8 @@ class DNSLayer(layer.Layer):
msg = dns.Message.unpack(event.data)
except struct.error as e:
yield commands.Log(f"{event.connection} sent an invalid message: {e}")
yield commands.CloseConnection(event.connection)
self._handle_event = self.state_done
else:
try:
flow = self.flows[msg.id]

View File

@ -1,5 +1,7 @@
import collections
import enum
from logging import DEBUG, WARNING
import time
from dataclasses import dataclass
from functools import cached_property
@ -492,14 +494,14 @@ class HttpStream(layer.Layer):
else:
yield commands.Log(
f"Sent HTTP 101 response, but no protocol is enabled to upgrade to.",
"warn",
WARNING,
)
yield commands.CloseConnection(self.context.client)
self.client_state = self.server_state = self.state_errored
return
if self.debug:
yield commands.Log(
f"{self.debug}[http] upgrading to {self.child_layer}", "debug"
f"{self.debug}[http] upgrading to {self.child_layer}", DEBUG
)
yield from self.child_layer.handle_event(events.Start())
self._handle_event = self.passthrough

View File

@ -1,4 +1,6 @@
import collections
from logging import DEBUG, ERROR
import time
from collections.abc import Sequence
from enum import Enum
@ -171,10 +173,10 @@ class Http2Connection(HttpConnection):
for h2_event in events:
if self.debug:
yield Log(f"{self.debug}[h2] {h2_event}", "debug")
yield Log(f"{self.debug}[h2] {h2_event}", DEBUG)
if (yield from self.handle_h2_event(h2_event)):
if self.debug:
yield Log(f"{self.debug}[h2] done", "debug")
yield Log(f"{self.debug}[h2] done", DEBUG)
return
data_to_send = self.h2_conn.data_to_send()
@ -255,7 +257,7 @@ class Http2Connection(HttpConnection):
elif isinstance(event, h2.events.PushedStreamReceived):
yield Log(
"Received HTTP/2 push promise, even though we signalled no support.",
"error",
ERROR,
)
elif isinstance(event, h2.events.UnknownFrameReceived):
# https://http2.github.io/http2-spec/#rfc.section.4.1
@ -468,7 +470,7 @@ class Http2Client(Http2Connection):
if data is not None:
yield Log(
f"Send HTTP/2 keep-alive PING to {human.format_address(self.conn.peername)}",
"debug",
DEBUG,
)
yield SendData(self.conn, data)
time_until_next_ping = self.context.options.http2_ping_keepalive - (

View File

@ -1,3 +1,5 @@
from logging import DEBUG
import time
from typing import Optional
@ -90,7 +92,7 @@ class HttpUpstreamProxy(tunnel.TunnelLayer):
else:
proxyaddr = human.format_address(self.tunnel_connection.address)
raw_resp = b"\n".join(response_head)
yield commands.Log(f"{proxyaddr}: {raw_resp!r}", level="debug")
yield commands.Log(f"{proxyaddr}: {raw_resp!r}", DEBUG)
return (
False,
f"Upstream proxy {proxyaddr} refused HTTP CONNECT request: {response.status_code} {response.reason}",

View File

@ -1,7 +1,9 @@
import struct
from logging import DEBUG, ERROR, INFO, WARNING
import time
from dataclasses import dataclass
from typing import Iterator, Literal, Optional
from typing import Iterator, Optional
from OpenSSL import SSL
@ -275,7 +277,7 @@ class TLSLayer(tunnel.TunnelLayer):
yield TlsStartServerHook(tls_start)
if not tls_start.ssl_conn:
yield commands.Log(
f"No {self.proto_name} context was provided, failing connection.", "error"
f"No {self.proto_name} context was provided, failing connection.", ERROR
)
yield commands.CloseConnection(self.conn)
return
@ -368,7 +370,7 @@ class TLSLayer(tunnel.TunnelLayer):
self.conn.tls_version = self.tls.get_protocol_version_name()
if self.debug:
yield commands.Log(
f"{self.debug}[tls] tls established: {self.conn}", "debug"
f"{self.debug}[tls] tls established: {self.conn}", DEBUG
)
if self.conn == self.context.client:
yield TlsEstablishedClientHook(
@ -410,7 +412,7 @@ class TLSLayer(tunnel.TunnelLayer):
# which upon mistrusting a certificate still completes the handshake
# and then sends an alert in the next packet. At this point we have unfortunately
# already fired out `tls_established_client` hook.
yield commands.Log(f"TLS Error: {e}", "warn")
yield commands.Log(f"TLS Error: {e}", WARNING)
break
if plaintext:
yield from self.event_to_child(
@ -420,7 +422,7 @@ class TLSLayer(tunnel.TunnelLayer):
self.conn.state &= ~connection.ConnectionState.CAN_READ
if self.debug:
yield commands.Log(
f"{self.debug}[tls] close_notify {self.conn}", level="debug"
f"{self.debug}[tls] close_notify {self.conn}", DEBUG
)
yield from self.event_to_child(events.ConnectionClosed(self.conn))
@ -489,7 +491,7 @@ class ServerTLSLayer(TLSLayer):
yield from super().event_to_child(event)
def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]:
yield commands.Log(f"Server TLS handshake failed. {err}", level="warn")
yield commands.Log(f"Server TLS handshake failed. {err}", level=WARNING)
yield from super().on_handshake_error(err)
@ -620,7 +622,7 @@ class ClientTLSLayer(TLSLayer):
dest = self.conn.sni
else:
dest = human.format_address(self.context.server.address)
level: Literal["warn", "info"] = "warn"
level: int = WARNING
if err.startswith("Cannot parse ClientHello"):
pass
elif (
@ -645,7 +647,7 @@ class ClientTLSLayer(TLSLayer):
f"The client disconnected during the handshake. If this happens consistently for {dest}, "
f"this may indicate that the client does not trust the proxy's certificate."
)
level = "info"
level = INFO
elif err == "connection closed early":
pass
else:
@ -657,7 +659,7 @@ class ClientTLSLayer(TLSLayer):
def errored(self, event: events.Event) -> layer.CommandGenerator[None]:
if self.debug is not None:
yield commands.Log(f"Swallowing {event} as handshake failed.", "debug")
yield commands.Log(f"Swallowing {event} as handshake failed.", DEBUG)
class MockTLSLayer(TLSLayer):

View File

@ -12,6 +12,8 @@ Example:
from __future__ import annotations
import asyncio
import logging
import errno
import socket
import typing
@ -19,14 +21,17 @@ from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from typing import ClassVar, Generic, TypeVar, cast, get_args
from mitmproxy import ctx, flow, log, platform
from mitmproxy import ctx, flow, platform
from mitmproxy.connection import Address
from mitmproxy.master import Master
from mitmproxy.net import udp
from mitmproxy.proxy import commands, layers, mode_specs, server
from mitmproxy.proxy.context import Context
from mitmproxy.proxy.layer import Layer
from mitmproxy.utils import asyncio_utils, human
from mitmproxy.utils import human
logger = logging.getLogger(__name__)
class ProxyConnectionHandler(server.LiveConnectionHandler):
@ -45,13 +50,6 @@ class ProxyConnectionHandler(server.LiveConnectionHandler):
if isinstance(data, flow.Flow):
await data.wait_for_resume() # pragma: no cover
def log(self, message: str, level: str = "info") -> None:
x = log.LogEntry(self.log_prefix + message, level)
asyncio_utils.create_task(
self.master.addons.handle_lifecycle(log.AddLogHook(x)),
name="ProxyConnectionHandler.log",
)
M = TypeVar('M', bound=mode_specs.ProxyMode)
@ -152,7 +150,7 @@ class AsyncioServerInstance(ServerInstance[M], metaclass=ABCMeta):
else:
self.last_exception = None
addrs = " and ".join({human.format_address(a) for a in self._listen_addrs})
ctx.log.info(f"{self.mode.description} listening at {addrs}.")
logger.info(f"{self.mode.description} listening at {addrs}.")
async def stop(self) -> None:
assert self._server is not None
@ -170,7 +168,7 @@ class AsyncioServerInstance(ServerInstance[M], metaclass=ABCMeta):
else:
self.last_exception = None
addrs = " and ".join({human.format_address(a) for a in listen_addrs})
ctx.log.info(f"Stopped {self.mode.description} at {addrs}.")
logger.info(f"Stopped {self.mode.description} at {addrs}.")
async def listen(self, host: str, port: int) -> asyncio.Server | udp.UdpServer:
if self.mode.transport_protocol == "tcp":
@ -186,7 +184,7 @@ class AsyncioServerInstance(ServerInstance[M], metaclass=ABCMeta):
s.close()
return await asyncio.start_server(self.handle_tcp_connection, host, fixed_port)
except Exception as e:
ctx.log.debug(f"Failed to listen on a single port ({e!r}), falling back to default behavior.")
logger.debug(f"Failed to listen on a single port ({e!r}), falling back to default behavior.")
return await asyncio.start_server(self.handle_tcp_connection, host, port)
elif self.mode.transport_protocol == "udp":
# create_datagram_endpoint only creates one socket, so the workaround above doesn't apply
@ -224,7 +222,7 @@ class AsyncioServerInstance(ServerInstance[M], metaclass=ABCMeta):
assert platform.original_addr
handler.layer.context.server.address = platform.original_addr(socket)
except Exception as e:
ctx.log.error(f"Transparent mode failure: {e!r}")
logger.error(f"Transparent mode failure: {e!r}")
return
with self.manager.register_connection(connection_id, handler):
await handler.handle_client()

View File

@ -9,6 +9,8 @@ The very high level overview is as follows:
import abc
import asyncio
import collections
import logging
import time
import traceback
from collections.abc import Awaitable, Callable, MutableMapping
@ -28,6 +30,9 @@ from mitmproxy.utils import human
from mitmproxy.utils.data import pkg_data
logger = logging.getLogger(__name__)
class TimeoutWatchdog:
last_activity: float
CONNECTION_TIMEOUT = 10 * 60
@ -138,14 +143,14 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
await self.handle_hook(server_hooks.ClientDisconnectedHook(self.client))
if self.transports:
self.log("closing transports...", "debug")
self.log("closing transports...", logging.DEBUG)
for io in self.transports.values():
if io.handler:
io.handler.cancel("client disconnected")
await asyncio.wait(
[x.handler for x in self.transports.values() if x.handler]
)
self.log("transports closed!", "debug")
self.log("transports closed!", logging.DEBUG)
async def open_connection(self, command: commands.OpenConnection) -> None:
if not command.connection.address:
@ -321,8 +326,12 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
async def handle_hook(self, hook: commands.StartHook) -> None:
pass
def log(self, message: str, level: str = "info") -> None:
print(message)
def log(self, message: str, level: int = logging.INFO) -> None:
logger.log(
level,
message,
extra={"client": self.client.peername}
)
def server_event(self, event: events.Event) -> None:
self.timeout_watchdog.register_activity()
@ -368,7 +377,7 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
else:
raise RuntimeError(f"Unexpected command: {command}")
except Exception:
self.log(f"mitmproxy has crashed!\n{traceback.format_exc()}", level="error")
self.log(f"mitmproxy has crashed!\n{traceback.format_exc()}", logging.ERROR)
def close_connection(
self, connection: Connection, half_close: bool = False
@ -376,7 +385,7 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
if half_close:
if not connection.state & ConnectionState.CAN_WRITE:
return
self.log(f"half-closing {connection}", "debug")
self.log(f"half-closing {connection}", logging.DEBUG)
try:
writer = self.transports[connection].writer
assert writer
@ -429,10 +438,6 @@ class SimpleConnectionHandler(LiveConnectionHandler): # pragma: no cover
if hook.name in self.hook_handlers:
self.hook_handlers[hook.name](*hook.args())
def log(self, message: str, level: str = "info"):
if "Hook" not in message:
pass # print(message, file=sys.stderr if level in ("error", "warn") else sys.stdout)
if __name__ == "__main__": # pragma: no cover
# simple standalone implementation for testing.

View File

@ -1,59 +1,13 @@
import asyncio
import sys
import mitmproxy.master
import mitmproxy.options
from mitmproxy import hooks, log
from mitmproxy import hooks
from mitmproxy import command
from mitmproxy import eventsequence
from mitmproxy.addons import script, core
class LogRecorder:
def __init__(self, master):
self.master: RecordingMaster = master
def add_log(self, entry: log.LogEntry):
self.master.logs.append(entry)
class RecordingMaster(mitmproxy.master.Master):
def __init__(self, *args, **kwargs):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
super().__init__(*args, **kwargs, event_loop=loop)
self.addons.add(LogRecorder(self))
self.logs = []
def dump_log(self, outf=sys.stdout):
for i in self.logs:
print(f"{i.level}: {i.msg}", file=outf)
def has_log(self, txt, level=None):
for i in self.logs:
if level and i.level != level:
continue
if txt.lower() in i.msg.lower():
return True
return False
async def await_log(self, txt, level=None, timeout=1):
# start with a sleep(0), which lets all other coroutines advance.
# often this is enough to not sleep at all.
await asyncio.sleep(0)
for i in range(int(timeout / 0.01)):
if self.has_log(txt, level):
return True
else:
await asyncio.sleep(0.01)
raise AssertionError(f"Did not find log entry {txt!r} in {self.logs}.")
def clear(self):
self.logs = []
class context:
"""
A context for testing addons, which sets up the mitmproxy.ctx module so
@ -62,8 +16,13 @@ class context:
"""
def __init__(self, *addons, options=None, loadcore=True):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
options = options or mitmproxy.options.Options()
self.master = RecordingMaster(options)
self.master = mitmproxy.master.Master(options, event_loop=loop)
self.options = self.master.options
if loadcore:

View File

@ -1,8 +1,8 @@
import logging
from collections.abc import Sequence
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import ctx
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
@ -17,7 +17,7 @@ class CommandExecutor:
try:
ret = self.master.commands.execute(cmd)
except exceptions.CommandError as e:
ctx.log.error(str(e))
logging.error(str(e))
else:
if ret is not None:
if type(ret) == Sequence[flow.Flow]:

View File

@ -1,4 +1,5 @@
import csv
import logging
from collections.abc import Sequence
import mitmproxy.types
@ -12,11 +13,15 @@ from mitmproxy import http
from mitmproxy import log
from mitmproxy import tcp
from mitmproxy import udp
from mitmproxy.log import ALERT
from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
from mitmproxy.utils import strutils
logger = logging.getLogger(__name__)
console_palettes = [
"lowlight",
"lowdark",
@ -63,7 +68,7 @@ class ConsoleAddon:
str,
"info",
"EventLog verbosity.",
choices=log.LogTierOrder,
choices=log.LogLevels,
)
loader.add_option(
"console_layout",
@ -232,7 +237,7 @@ class ConsoleAddon:
try:
self.master.commands.call_strings(cmd, repl)
except exceptions.CommandError as e:
ctx.log.error(str(e))
logger.error(str(e))
self.master.overlay(overlay.Chooser(self.master, prompt, choices, "", callback))
@ -257,7 +262,7 @@ class ConsoleAddon:
try:
self.master.commands.call_strings(subcmd, repl)
except exceptions.CommandError as e:
ctx.log.error(str(e))
logger.error(str(e))
self.master.overlay(overlay.Chooser(self.master, prompt, choices, "", callback))
@ -312,7 +317,7 @@ class ConsoleAddon:
if isinstance(flow, (http.HTTPFlow, tcp.TCPFlow, udp.UDPFlow, dns.DNSFlow)):
self.master.switch_view("flowview")
else:
ctx.log.warn(f"No detail view for {type(flow).__name__}.")
logger.warning(f"No detail view for {type(flow).__name__}.")
@command.command("console.exit")
def exit(self) -> None:
@ -510,9 +515,9 @@ class ConsoleAddon:
writer.writerow(
[strutils.always_str(x) or "" for x in row] # type: ignore
)
ctx.log.alert("Saved %s rows as CSV." % (len(rows)))
logger.log(ALERT, "Saved %s rows as CSV." % (len(rows)))
except OSError as e:
ctx.log.error(str(e))
logger.error(str(e))
@command.command("console.grideditor.editor")
def grideditor_editor(self) -> None:
@ -542,7 +547,7 @@ class ConsoleAddon:
"view.settings.setval", ["@focus", f"flowview_mode_{idx}", mode]
)
except exceptions.CommandError as e:
ctx.log.error(str(e))
logger.error(str(e))
@command.command("console.flowview.mode.options")
def flowview_mode_options(self) -> Sequence[str]:

View File

@ -1,3 +1,5 @@
import logging
import math
import sys
from functools import lru_cache
@ -327,7 +329,7 @@ class FlowDetails(tabs.Tabs):
viewmode, message, self.flow
)
if error:
self.master.log.debug(error)
logging.debug(error)
# Give hint that you have to tab for the response.
if description == "No content" and isinstance(message, http.Request):
description = "No request content"

View File

@ -1,3 +1,4 @@
import logging
import os
from collections.abc import Sequence
from typing import Optional
@ -183,7 +184,7 @@ class KeymapConfig:
try:
self.load_path(ctx.master.keymap, p)
except KeyBindingError as e:
ctx.log.error(e)
logging.error(e)
def load_path(self, km, p):
if os.path.exists(p) and os.path.isfile(p):

View File

@ -1,5 +1,6 @@
import argparse
import asyncio
import logging
import os
import signal
import sys
@ -49,6 +50,8 @@ def run(
"""
async def main() -> T:
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger("tornado").setLevel(logging.WARNING)
debug.register_info_dumpers()
opts = options.Options()
@ -85,7 +88,7 @@ def run(
sys.exit(0)
if extra:
if args.filter_args:
master.log.info(
logging.info(
f"Only processing flows that match \"{' & '.join(args.filter_args)}\""
)
opts.update(**extra(args))

View File

@ -486,7 +486,7 @@ class FlowContentView(RequestHandler):
viewname, message, flow
)
if error:
self.master.log.error(error)
logging.error(error)
if max_lines:
lines = islice(lines, max_lines)

View File

@ -1,3 +1,5 @@
import logging
import errno
import tornado.httpserver
import tornado.ioloop
@ -16,6 +18,8 @@ from mitmproxy.addons.proxyserver import Proxyserver
from mitmproxy.contrib.tornado import patch_tornado
from mitmproxy.tools.web import app, webaddons, static_viewer
logger = logging.getLogger(__name__)
class WebMaster(master.Master):
def __init__(self, options, with_termlog=True):
@ -100,7 +104,7 @@ class WebMaster(master.Master):
message += f"\nTry specifying a different port by using `--set web_port={self.options.web_port + 1}`."
raise OSError(e.errno, message, e.filename) from e
self.log.info(
logger.info(
f"Web server listening at http://{self.options.web_host}:{self.options.web_port}/",
)

View File

@ -1,4 +1,5 @@
import json
import logging
import os.path
import pathlib
import shutil
@ -69,11 +70,8 @@ def save_flows_content(path: pathlib.Path, flows: Iterable[flow.Flow]) -> None:
else:
description, lines = "No content.", []
if time.time() - t > 0.1:
ctx.log(
"Slow content view: {} took {}s".format(
description.strip(), round(time.time() - t, 1)
),
"info",
logging.info(
f"Slow content view: {description.strip()} took {round(time.time() - t, 1)}s",
)
with open(
str(message_path / "content" / "Auto.json"), "w"

View File

@ -1,3 +1,4 @@
import logging
import webbrowser
from collections.abc import Sequence
@ -22,7 +23,7 @@ class WebAddon:
web_url = f"http://{ctx.options.web_host}:{ctx.options.web_port}/"
success = open_browser(web_url)
if not success:
ctx.log.info(
logging.info(
f"No web browser found. Please open a browser and point it to {web_url}",
)

View File

@ -1,5 +1,7 @@
import asyncio
import cProfile
import logging
from mitmproxy import ctx
@ -22,7 +24,7 @@ class Benchmark:
self.resps += 1
async def procs(self):
ctx.log.error("starting benchmark")
logging.error("starting benchmark")
backend = await asyncio.create_subprocess_exec("devd", "-q", "-p", "10001", ".")
traf = await asyncio.create_subprocess_exec(
"wrk",
@ -34,8 +36,8 @@ class Benchmark:
stdout, _ = await traf.communicate()
with open(ctx.options.benchmark_save_path + ".bench", mode="wb") as f:
f.write(stdout)
ctx.log.error(f"Proxy saw {self.reqs} requests, {self.resps} responses")
ctx.log.error(stdout.decode("ascii"))
logging.error(f"Proxy saw {self.reqs} requests, {self.resps} responses")
logging.error(stdout.decode("ascii"))
backend.kill()
ctx.master.shutdown()

View File

@ -1,10 +1,11 @@
import asyncio
import os
import socket
from mitmproxy.utils import data
import pytest
from mitmproxy.utils import data
pytest_plugins = ("test.full_coverage_plugin",)
skip_windows = pytest.mark.skipif(os.name == "nt", reason="Skipping due to Windows")
@ -28,3 +29,22 @@ skip_no_ipv6 = pytest.mark.skipif(no_ipv6, reason="Host has no IPv6 support")
@pytest.fixture()
def tdata():
return data.Data(__name__)
class AsyncLogCaptureFixture:
def __init__(self, caplog: pytest.LogCaptureFixture):
self.caplog = caplog
async def await_log(self, text, timeout=2):
await asyncio.sleep(0)
for i in range(int(timeout / 0.01)):
if text in self.caplog.text:
return True
else:
await asyncio.sleep(0.01)
raise AssertionError(f"Did not find {text!r} in log:\n{self.caplog.text}.")
@pytest.fixture
def caplog_async(caplog):
return AsyncLogCaptureFixture(caplog)

View File

@ -44,7 +44,7 @@ async def noresponseapp(scope, receive, send):
return
async def test_asgi_full():
async def test_asgi_full(caplog):
ps = Proxyserver()
addons = [
asgiapp.WSGIApp(tapp, "testapp", 80),
@ -55,8 +55,6 @@ async def test_asgi_full():
tctx.master.addons.add(next_layer.NextLayer())
tctx.configure(ps, listen_host="127.0.0.1", listen_port=0)
assert await ps.setup_servers()
ps.running()
await tctx.master.await_log("HTTP(S) proxy listening", level="info")
proxy_addr = ("127.0.0.1", ps.listen_addrs()[0][1])
reader, writer = await asyncio.open_connection(*proxy_addr)
@ -66,6 +64,8 @@ async def test_asgi_full():
assert header.startswith(b"HTTP/1.1 200 OK")
body = await reader.readuntil(b"testapp")
assert body == b"testapp"
writer.close()
await writer.wait_closed()
reader, writer = await asyncio.open_connection(*proxy_addr)
req = f"GET http://testapp:80/parameters?param1=1&param2=2 HTTP/1.1\r\n\r\n"
@ -74,6 +74,8 @@ async def test_asgi_full():
assert header.startswith(b"HTTP/1.1 200 OK")
body = await reader.readuntil(b"}")
assert body == b'{"param1": "1", "param2": "2"}'
writer.close()
await writer.wait_closed()
reader, writer = await asyncio.open_connection(*proxy_addr)
req = f"POST http://testapp:80/requestbody HTTP/1.1\r\nContent-Length: 6\r\n\r\nHello!"
@ -82,6 +84,8 @@ async def test_asgi_full():
assert header.startswith(b"HTTP/1.1 200 OK")
body = await reader.readuntil(b"}")
assert body == b'{"body": "Hello!"}'
writer.close()
await writer.wait_closed()
reader, writer = await asyncio.open_connection(*proxy_addr)
req = f"GET http://errapp:80/?foo=bar HTTP/1.1\r\n\r\n"
@ -90,6 +94,9 @@ async def test_asgi_full():
assert header.startswith(b"HTTP/1.1 500")
body = await reader.readuntil(b"ASGI Error")
assert body == b"ASGI Error"
writer.close()
await writer.wait_closed()
assert "ValueError" in caplog.text
reader, writer = await asyncio.open_connection(*proxy_addr)
req = f"GET http://noresponseapp:80/ HTTP/1.1\r\n\r\n"
@ -98,3 +105,9 @@ async def test_asgi_full():
assert header.startswith(b"HTTP/1.1 500")
body = await reader.readuntil(b"ASGI Error")
assert body == b"ASGI Error"
writer.close()
await writer.wait_closed()
assert "no response sent" in caplog.text
tctx.configure(ps, server=False)
assert await ps.setup_servers()

View File

@ -4,29 +4,29 @@ from mitmproxy.addons import browser
from mitmproxy.test import taddons
async def test_browser():
with mock.patch("subprocess.Popen") as po, mock.patch("shutil.which") as which:
def test_browser(caplog):
caplog.set_level("INFO")
with mock.patch("subprocess.Popen") as po, mock.patch("shutil.which") as which, taddons.context():
which.return_value = "chrome"
b = browser.Browser()
with taddons.context() as tctx:
b.start()
assert po.called
b.start()
assert po.called
b.start()
await tctx.master.await_log("Starting additional browser")
assert len(b.browser) == 2
b.done()
assert not b.browser
b.start()
assert "Starting additional browser" in caplog.text
assert len(b.browser) == 2
b.done()
assert not b.browser
async def test_no_browser():
async def test_no_browser(caplog):
caplog.set_level("INFO")
with mock.patch("shutil.which") as which:
which.return_value = False
b = browser.Browser()
with taddons.context() as tctx:
b.start()
await tctx.master.await_log("platform is not supported")
b.start()
assert "platform is not supported" in caplog.text
async def test_get_browser_cmd_executable():

View File

@ -99,16 +99,16 @@ async def test_playback_https_upstream():
)
async def test_playback_crash(monkeypatch):
async def test_playback_crash(monkeypatch, caplog_async):
async def raise_err():
raise ValueError("oops")
monkeypatch.setattr(ReplayHandler, "replay", raise_err)
cp = ClientPlayback()
with taddons.context(cp) as tctx:
with taddons.context(cp):
cp.running()
cp.start_replay([tflow.tflow(live=False)])
await tctx.master.await_log("Client replay has crashed!", level="error")
await caplog_async.await_log("Client replay has crashed!")
assert cp.count() == 0
cp.done()
@ -136,16 +136,16 @@ def test_check():
assert "Can only replay HTTP" in cp.check(f)
async def test_start_stop(tdata):
async def test_start_stop(tdata, caplog_async):
cp = ClientPlayback()
with taddons.context(cp) as tctx:
with taddons.context(cp):
cp.start_replay([tflow.tflow(live=False)])
assert cp.count() == 1
ws_flow = tflow.twebsocketflow()
ws_flow.live = False
cp.start_replay([ws_flow])
await tctx.master.await_log("Can't replay WebSocket flows.", level="warn")
await caplog_async.await_log("Can't replay WebSocket flows.")
assert cp.count() == 1
cp.stop_replay()

View File

@ -24,7 +24,7 @@ class TestCommandHistory:
with open(history_file) as f:
assert f.read() == "cmd3\ncmd4\n"
async def test_done_writing_failed(self):
async def test_done_writing_failed(self, caplog):
ch = command_history.CommandHistory()
ch.VACUUM_SIZE = 1
with taddons.context(ch) as tctx:
@ -33,7 +33,7 @@ class TestCommandHistory:
ch.history.append("cmd3")
tctx.options.confdir = "/non/existent/path/foobar1234/"
ch.done()
await tctx.master.await_log(f"Failed writing to {ch.history_file}")
assert "Failed writing to" in caplog.text
def test_add_command(self):
ch = command_history.CommandHistory()
@ -45,12 +45,12 @@ class TestCommandHistory:
ch.add_command("")
assert ch.history == ["cmd1", "cmd2"]
async def test_add_command_failed(self):
async def test_add_command_failed(self, caplog):
ch = command_history.CommandHistory()
with taddons.context(ch) as tctx:
tctx.options.confdir = "/non/existent/path/foobar1234/"
ch.add_command("cmd1")
await tctx.master.await_log(f"Failed writing to {ch.history_file}")
assert "Failed writing to" in caplog.text
def test_get_next_and_prev(self, tmpdir):
ch = command_history.CommandHistory()
@ -152,7 +152,7 @@ class TestCommandHistory:
ch.clear_history()
async def test_clear_failed(self, monkeypatch):
async def test_clear_failed(self, monkeypatch, caplog):
ch = command_history.CommandHistory()
with taddons.context(ch) as tctx:
@ -163,7 +163,7 @@ class TestCommandHistory:
with patch.object(Path, "unlink") as mock_unlink:
mock_unlink.side_effect = IOError()
ch.clear_history()
await tctx.master.await_log(f"Failed deleting {ch.history_file}")
assert "Failed deleting" in caplog.text
def test_filter(self, tmpdir):
ch = command_history.CommandHistory()

View File

@ -72,7 +72,7 @@ def qr(f):
return fp.read()
async def test_cut_clip():
async def test_cut_clip(caplog):
v = view.View()
c = cut.Cut()
with taddons.context() as tctx:
@ -97,7 +97,7 @@ async def test_cut_clip():
)
pc.side_effect = pyperclip.PyperclipException(log_message)
tctx.command(c.clip, "@all", "request.method")
await tctx.master.await_log(log_message, level="error")
assert log_message in caplog.text
def test_cut_save(tmpdir):
@ -130,7 +130,7 @@ def test_cut_save(tmpdir):
(FileNotFoundError, "No such file or directory"),
],
)
async def test_cut_save_open(exception, log_message, tmpdir):
async def test_cut_save_open(exception, log_message, tmpdir, caplog):
f = str(tmpdir.join("path"))
v = view.View()
c = cut.Cut()
@ -141,7 +141,7 @@ async def test_cut_save_open(exception, log_message, tmpdir):
with mock.patch("mitmproxy.addons.cut.open") as m:
m.side_effect = exception(log_message)
tctx.command(c.save, "@all", "request.method", f)
await tctx.master.await_log(log_message, level="error")
assert log_message in caplog.text
def test_cut():

View File

@ -173,7 +173,8 @@ def test_echo_request_line():
sio.truncate(0)
async def test_contentview():
async def test_contentview(caplog):
caplog.set_level("DEBUG")
with mock.patch("mitmproxy.contentviews.auto.ViewAuto.__call__") as va:
va.side_effect = ValueError("")
sio = io.StringIO()
@ -181,7 +182,7 @@ async def test_contentview():
with taddons.context(d) as tctx:
tctx.configure(d, flow_detail=4)
d.response(tflow.tflow())
await tctx.master.await_log("content viewer failed")
assert "content viewer failed" in caplog.text
def test_tcp():

View File

@ -1,29 +1,22 @@
import asyncio
import pytest
from mitmproxy import log
from mitmproxy.addons.errorcheck import ErrorCheck
from mitmproxy.tools import main
@pytest.mark.parametrize("do_log", [True, False])
def test_errorcheck(capsys, do_log):
async def run():
# suppress error that task exception was not retrieved.
asyncio.get_running_loop().set_exception_handler(lambda *_: 0)
e = ErrorCheck(do_log)
e.add_log(log.LogEntry("fatal", "error"))
await e.shutdown_if_errored()
await asyncio.sleep(0)
def test_errorcheck(tdata, capsys):
"""Integration test: Make sure that we catch errors on startup an exit."""
with pytest.raises(SystemExit):
asyncio.run(run())
if do_log:
assert capsys.readouterr().err == "Error on startup: fatal\n"
main.mitmproxy(
[
"-s",
tdata.path("mitmproxy/data/addonscripts/load_error.py"),
]
)
assert "Error on startup" in capsys.readouterr().err
async def test_no_error():
e = ErrorCheck()
await e.shutdown_if_errored()
await asyncio.sleep(0)
e.finish()

View File

@ -1,8 +1,10 @@
from mitmproxy import log
import asyncio
import logging
from mitmproxy.addons import eventstore
def test_simple():
async def test_simple():
store = eventstore.EventStore()
assert not store.data
@ -24,7 +26,8 @@ def test_simple():
assert not sig_refresh_called
# test .log()
store.add_log(log.LogEntry("test", "info"))
logging.error("test")
await asyncio.sleep(0)
assert store.data
assert sig_add_called
@ -38,18 +41,22 @@ def test_simple():
assert not sig_add_called
assert sig_refresh_called
store.done()
def test_max_size():
async def test_max_size():
store = eventstore.EventStore(3)
assert store.size == 3
store.add_log(log.LogEntry("foo", "info"))
store.add_log(log.LogEntry("bar", "info"))
store.add_log(log.LogEntry("baz", "info"))
logging.warning("foo")
logging.warning("bar")
logging.warning("baz")
await asyncio.sleep(0)
assert len(store.data) == 3
assert ["foo", "bar", "baz"] == [x.msg for x in store.data]
assert "baz" in store.data[-1].msg
# overflow
store.add_log(log.LogEntry("boo", "info"))
logging.warning("boo")
await asyncio.sleep(0)
assert len(store.data) == 3
assert ["bar", "baz", "boo"] == [x.msg for x in store.data]
assert "boo" in store.data[-1].msg
store.done()

View File

@ -295,17 +295,16 @@ def test_export(tmp_path) -> None:
(FileNotFoundError, "No such file or directory"),
],
)
async def test_export_open(exception, log_message, tmpdir):
async def test_export_open(exception, log_message, tmpdir, caplog):
f = str(tmpdir.join("path"))
e = export.Export()
with taddons.context() as tctx:
with mock.patch("mitmproxy.addons.export.open") as m:
m.side_effect = exception(log_message)
e.file("raw_request", tflow.tflow(resp=True), f)
await tctx.master.await_log(log_message, level="error")
with mock.patch("mitmproxy.addons.export.open") as m:
m.side_effect = exception(log_message)
e.file("raw_request", tflow.tflow(resp=True), f)
assert log_message in caplog.text
async def test_clip(tmpdir):
async def test_clip(tmpdir, caplog):
e = export.Export()
with taddons.context() as tctx:
tctx.configure(e)
@ -335,4 +334,4 @@ async def test_clip(tmpdir):
)
pc.side_effect = pyperclip.PyperclipException(log_message)
e.clip("raw_request", tflow.tflow(resp=True))
await tctx.master.await_log(log_message, level="error")
assert log_message in caplog.text

View File

@ -169,7 +169,8 @@ class TestMapLocal:
ml.request(f)
assert f.response.content == b"foofoobar"
async def test_nonexistent_files(self, tmpdir, monkeypatch):
async def test_nonexistent_files(self, tmpdir, monkeypatch, caplog):
caplog.set_level("INFO")
ml = MapLocal()
with taddons.context(ml) as tctx:
@ -178,7 +179,7 @@ class TestMapLocal:
f.request.url = b"https://example.org/css/nonexistent"
ml.request(f)
assert f.response.status_code == 404
await tctx.master.await_log("None of the local file candidates exist")
assert "None of the local file candidates exist" in caplog.text
tmpfile = tmpdir.join("foo.jpg")
tmpfile.write("foo")
@ -188,7 +189,7 @@ class TestMapLocal:
f = tflow.tflow()
f.request.url = b"https://example.org/images/foo.jpg"
ml.request(f)
await tctx.master.await_log("could not read file")
assert "Could not read" in caplog.text
def test_is_killed(self, tmpdir):
ml = MapLocal()

View File

@ -83,7 +83,7 @@ class TestModifyBodyFile:
mb.request(f)
assert f.request.content == b"bar"
async def test_nonexistent(self, tmpdir):
async def test_nonexistent(self, tmpdir, caplog):
mb = modifybody.ModifyBody()
with taddons.context(mb) as tctx:
with pytest.raises(Exception, match="Invalid file path"):
@ -96,4 +96,4 @@ class TestModifyBodyFile:
f = tflow.tflow()
f.request.content = b"foo"
mb.request(f)
await tctx.master.await_log("could not read")
assert "Could not read" in caplog.text

View File

@ -127,7 +127,7 @@ class TestModifyHeadersFile:
mh.request(f)
assert f.request.headers["one"] == "two"
async def test_nonexistent(self, tmpdir):
async def test_nonexistent(self, tmpdir, caplog):
mh = ModifyHeaders()
with taddons.context(mh) as tctx:
with pytest.raises(
@ -142,4 +142,4 @@ class TestModifyHeadersFile:
f = tflow.tflow()
f.request.content = b"foo"
mh.request(f)
await tctx.master.await_log("could not read")
assert "Could not read" in caplog.text

View File

@ -47,7 +47,9 @@ async def tcp_server(handle_conn) -> Address:
server.close()
async def test_start_stop():
async def test_start_stop(caplog_async):
caplog_async.caplog.set_level("INFO")
async def server_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
@ -65,7 +67,7 @@ async def test_start_stop():
assert not ps.servers
assert await ps.setup_servers()
ps.running()
await tctx.master.await_log("HTTP(S) proxy listening at", level="info")
await caplog_async.await_log("HTTP(S) proxy listening at")
assert ps.servers
proxy_addr = ps.listen_addrs()[0]
@ -80,7 +82,7 @@ async def test_start_stop():
await ps.setup_servers() # assert this can always be called without side effects
tctx.configure(ps, server=False)
await tctx.master.await_log("Stopped HTTP(S) proxy at", level="info")
await caplog_async.await_log("Stopped HTTP(S) proxy at")
if ps.servers.is_updating:
async with ps.servers._lock:
pass # wait until start/stop is finished.
@ -119,7 +121,6 @@ async def test_inject() -> None:
tctx.configure(ps, listen_host="127.0.0.1", listen_port=0)
assert await ps.setup_servers()
ps.running()
await tctx.master.await_log("HTTP(S) proxy listening", level="info")
proxy_addr = ps.servers["regular"].listen_addrs[0]
reader, writer = await asyncio.open_connection(*proxy_addr)
@ -138,25 +139,20 @@ async def test_inject() -> None:
assert await reader.read(1) == b"c"
async def test_inject_fail() -> None:
async def test_inject_fail(caplog) -> None:
ps = Proxyserver()
with taddons.context(ps) as tctx:
ps.inject_websocket(tflow.tflow(), True, b"test")
await tctx.master.await_log(
"Cannot inject WebSocket messages into non-WebSocket flows.", level="warn"
)
ps.inject_tcp(tflow.tflow(), True, b"test")
await tctx.master.await_log(
"Cannot inject TCP messages into non-TCP flows.", level="warn"
)
ps.inject_websocket(tflow.tflow(), True, b"test")
assert "Cannot inject WebSocket messages into non-WebSocket flows." in caplog.text
ps.inject_tcp(tflow.tflow(), True, b"test")
assert "Cannot inject TCP messages into non-TCP flows." in caplog.text
ps.inject_websocket(tflow.twebsocketflow(), True, b"test")
await tctx.master.await_log("Flow is not from a live connection.", level="warn")
ps.inject_websocket(tflow.ttcpflow(), True, b"test")
await tctx.master.await_log("Flow is not from a live connection.", level="warn")
ps.inject_websocket(tflow.twebsocketflow(), True, b"test")
assert "Flow is not from a live connection." in caplog.text
ps.inject_websocket(tflow.ttcpflow(), True, b"test")
assert "Cannot inject WebSocket messages into non-WebSocket flows" in caplog.text
async def test_warn_no_nextlayer():
async def test_warn_no_nextlayer(caplog):
"""
Test that we log an error if the proxy server is started without NextLayer addon.
That is a mean trap to fall into when writing end-to-end tests.
@ -166,7 +162,7 @@ async def test_warn_no_nextlayer():
tctx.configure(ps, listen_host="127.0.0.1", listen_port=0, server=False)
assert await ps.setup_servers()
ps.running()
await tctx.master.await_log("Warning: Running proxyserver without nextlayer addon!", level="warn")
assert "Warning: Running proxyserver without nextlayer addon!" in caplog.text
async def test_self_connect():
@ -178,12 +174,12 @@ async def test_self_connect():
tctx.configure(ps, listen_host="127.0.0.1", listen_port=0)
assert await ps.setup_servers()
ps.running()
await tctx.master.await_log("HTTP(S) proxy listening", level="info")
assert ps.servers
server.address = ("localhost", ps.servers["regular"].listen_addrs[0][1])
ps.server_connect(server_hooks.ServerConnectionHookData(server, client))
assert "Request destination unknown" in server.error
tctx.configure(ps, server=False)
assert await ps.setup_servers()
def test_options():
@ -209,19 +205,21 @@ def test_options():
tctx.configure(ps, mode=["regular"], server=False)
async def test_startup_err(monkeypatch) -> None:
async def test_startup_err(monkeypatch, caplog) -> None:
async def _raise(*_):
raise OSError("cannot bind")
monkeypatch.setattr(asyncio, "start_server", _raise)
ps = Proxyserver()
with taddons.context(ps) as tctx:
with taddons.context(ps):
assert not await ps.setup_servers()
await tctx.master.await_log("cannot bind", level="error")
assert "cannot bind" in caplog.text
async def test_shutdown_err() -> None:
async def test_shutdown_err(caplog_async) -> None:
caplog_async.caplog.set_level("INFO")
async def _raise(*_):
raise OSError("cannot close")
@ -234,7 +232,7 @@ async def test_shutdown_err() -> None:
for server in ps.servers:
setattr(server, "stop", _raise)
tctx.configure(ps, server=False)
await tctx.master.await_log("cannot close", level="error")
await caplog_async.await_log("cannot close")
class DummyResolver:
@ -249,7 +247,8 @@ class DummyResolver:
raise e
async def test_dns() -> None:
async def test_dns(caplog_async) -> None:
caplog_async.caplog.set_level("INFO")
ps = Proxyserver()
with taddons.context(ps, DummyResolver()) as tctx:
tctx.configure(
@ -258,12 +257,10 @@ async def test_dns() -> None:
)
assert await ps.setup_servers()
ps.running()
await tctx.master.await_log("DNS server listening at", level="info")
await caplog_async.await_log("DNS server listening at")
assert ps.servers
dns_addr = ps.servers["dns@127.0.0.1:0"].listen_addrs[0]
r, w = await udp.open_connection(*dns_addr)
w.write(b"\x00")
await tctx.master.await_log("sent an invalid message", level="info")
req = tdnsreq()
w.write(req.packed)
resp = dns.Message.unpack(await r.read(udp.MAX_DATAGRAM_SIZE))
@ -281,8 +278,11 @@ async def test_dns() -> None:
dns_layer = ps.connections[("udp", w.get_extra_info("sockname"), dns_addr)].layer
assert isinstance(dns_layer, layers.DNSLayer)
assert len(dns_layer.flows) == 2
w.write(b"\x00")
await caplog_async.await_log("sent an invalid message")
tctx.configure(ps, server=False)
await tctx.master.await_log("Stopped DNS server at", level="info")
await caplog_async.await_log("Stopped DNS server at")
def test_validation_no_transparent(monkeypatch):
@ -312,7 +312,9 @@ async def udp_server(handle_conn) -> Address:
server.close()
async def test_dtls(monkeypatch) -> None:
async def test_dtls(monkeypatch, caplog_async) -> None:
caplog_async.caplog.set_level("INFO")
def server_handler(
transport: asyncio.DatagramTransport,
data: bytes,
@ -335,7 +337,7 @@ async def test_dtls(monkeypatch) -> None:
tctx.configure(ps, mode=[mode])
assert await ps.setup_servers()
ps.running()
await tctx.master.await_log(f"reverse proxy to dtls://{server_addr[0]}:{server_addr[1]} listening", level="info")
await caplog_async.await_log(f"reverse proxy to dtls://{server_addr[0]}:{server_addr[1]} listening")
assert ps.servers
addr = ps.servers[mode].listen_addrs[0]
r, w = await udp.open_connection(*addr)
@ -344,4 +346,4 @@ async def test_dtls(monkeypatch) -> None:
assert repr(ps) == "Proxyserver(1 active conns)"
assert len(ps.connections) == 1
tctx.configure(ps, server=False)
await tctx.master.await_log("stopped reverse proxy to dtls", level="info")
await caplog_async.await_log("Stopped reverse proxy to dtls")

View File

@ -47,7 +47,7 @@ class TestReadFile:
tctx.configure(rf, readfile_filter="~~")
tctx.configure(rf, readfile_filter="")
async def test_read(self, tmpdir, data, corrupt_data):
async def test_read(self, tmpdir, data, corrupt_data, caplog_async):
rf = readfile.ReadFile()
with taddons.context(rf) as tctx:
assert not rf.reading()
@ -65,25 +65,24 @@ class TestReadFile:
tf.write(corrupt_data.getvalue())
tctx.configure(rf, rfile=str(tf))
rf.running()
await tctx.master.await_log("corrupted")
await caplog_async.await_log("corrupted")
async def test_corrupt(self, corrupt_data):
async def test_corrupt(self, corrupt_data, caplog_async):
rf = readfile.ReadFile()
with taddons.context(rf) as tctx:
with taddons.context(rf):
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows(io.BytesIO(b"qibble"))
tctx.master.clear()
caplog_async.caplog.clear()
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows(corrupt_data)
await tctx.master.await_log("file corrupted")
await caplog_async.await_log("file corrupted")
async def test_nonexistent_file(self):
async def test_nonexistent_file(self, caplog):
rf = readfile.ReadFile()
with taddons.context(rf) as tctx:
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows_from_path("nonexistent")
await tctx.master.await_log("nonexistent")
with pytest.raises(exceptions.FlowReadException):
await rf.load_flows_from_path("nonexistent")
assert "nonexistent" in caplog.text
class TestReadFileStdin:

View File

@ -19,18 +19,17 @@ from mitmproxy.tools import main
script.ReloadInterval = 0.1
async def test_load_script(tdata):
with taddons.context() as tctx:
ns = script.load_script(
tdata.path("mitmproxy/data/addonscripts/recorder/recorder.py")
)
assert ns.addons
def test_load_script(tdata, caplog):
ns = script.load_script(
tdata.path("mitmproxy/data/addonscripts/recorder/recorder.py")
)
assert ns.addons
script.load_script("nonexistent")
await tctx.master.await_log("No such file or directory")
script.load_script("nonexistent")
assert "No such file or directory" in caplog.text
script.load_script(tdata.path("mitmproxy/data/addonscripts/recorder/error.py"))
await tctx.master.await_log("invalid syntax")
script.load_script(tdata.path("mitmproxy/data/addonscripts/recorder/error.py"))
assert "invalid syntax" in caplog.text
def test_load_fullname(tdata):
@ -64,14 +63,15 @@ class TestScript:
s = script.Script(f'"{path}"', False)
assert '"' not in s.fullpath
async def test_simple(self, tdata):
async def test_simple(self, tdata, caplog_async):
caplog_async.caplog.set_level("DEBUG")
sc = script.Script(
tdata.path("mitmproxy/data/addonscripts/recorder/recorder.py"),
True,
)
with taddons.context(sc) as tctx:
tctx.configure(sc)
await tctx.master.await_log("recorder configure")
await caplog_async.await_log("recorder configure")
rec = tctx.master.addons.get("recorder")
assert rec.call_log[0][0:2] == ("recorder", "load")
@ -81,42 +81,41 @@ class TestScript:
tctx.master.addons.trigger(HttpRequestHook(f))
assert rec.call_log[0][1] == "request"
sc.done()
async def test_reload(self, tmpdir):
async def test_reload(self, tmp_path, caplog_async):
caplog_async.caplog.set_level("INFO")
with taddons.context() as tctx:
f = tmpdir.join("foo.py")
f.ensure(file=True)
f.write("\n")
f = tmp_path / "foo.py"
f.write_text("\n")
sc = script.Script(str(f), True)
tctx.configure(sc)
await tctx.master.await_log("Loading")
await caplog_async.await_log("Loading")
tctx.master.clear()
for i in range(20):
f.write("\n")
if tctx.master.has_log("Loading"):
break
await asyncio.sleep(0.1)
else:
raise AssertionError("No reload seen")
caplog_async.caplog.clear()
f.write_text("\n\n")
await caplog_async.await_log("Loading")
sc.done()
async def test_exception(self, tdata):
async def test_exception(self, tdata, caplog_async):
caplog_async.caplog.set_level("INFO")
with taddons.context() as tctx:
sc = script.Script(
tdata.path("mitmproxy/data/addonscripts/error.py"),
True,
)
tctx.master.addons.add(sc)
await tctx.master.await_log("error load")
await caplog_async.await_log("error load")
tctx.configure(sc)
f = tflow.tflow(resp=True)
tctx.master.addons.trigger(HttpRequestHook(f))
await tctx.master.await_log("ValueError: Error!")
await tctx.master.await_log("error.py")
await caplog_async.await_log("ValueError: Error!")
await caplog_async.await_log("error.py")
sc.done()
async def test_optionexceptions(self, tdata):
async def test_optionexceptions(self, tdata, caplog_async):
with taddons.context() as tctx:
sc = script.Script(
tdata.path("mitmproxy/data/addonscripts/configure.py"),
@ -124,19 +123,22 @@ class TestScript:
)
tctx.master.addons.add(sc)
tctx.configure(sc)
await tctx.master.await_log("Options Error")
await caplog_async.await_log("Options Error")
sc.done()
async def test_addon(self, tdata):
async def test_addon(self, tdata, caplog_async):
caplog_async.caplog.set_level("INFO")
with taddons.context() as tctx:
sc = script.Script(tdata.path("mitmproxy/data/addonscripts/addon.py"), True)
tctx.master.addons.add(sc)
await tctx.master.await_log("addon running")
await caplog_async.await_log("addon running")
assert sc.ns.event_log == [
"scriptload",
"addonload",
"scriptconfigure",
"addonconfigure",
]
sc.done()
class TestCutTraceback:
@ -158,13 +160,14 @@ class TestCutTraceback:
class TestScriptLoader:
async def test_script_run(self, tdata):
async def test_script_run(self, tdata, caplog_async):
caplog_async.caplog.set_level("DEBUG")
rp = tdata.path("mitmproxy/data/addonscripts/recorder/recorder.py")
sc = script.ScriptLoader()
with taddons.context(sc) as tctx:
with taddons.context(sc):
sc.script_run([tflow.tflow(resp=True)], rp)
await tctx.master.await_log("recorder response")
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
await caplog_async.await_log("recorder response")
debug = [i.msg for i in caplog_async.caplog.records if i.levelname == "DEBUG"]
assert debug == [
"recorder configure",
"recorder running",
@ -174,25 +177,24 @@ class TestScriptLoader:
"recorder response",
]
async def test_script_run_nonexistent(self):
async def test_script_run_nonexistent(self, caplog):
sc = script.ScriptLoader()
with taddons.context(sc) as tctx:
sc.script_run([tflow.tflow(resp=True)], "/")
await tctx.master.await_log("No such script")
sc.script_run([tflow.tflow(resp=True)], "/")
assert "No such script" in caplog.text
async def test_simple(self, tdata):
sc = script.ScriptLoader()
with taddons.context(loadcore=False) as tctx:
tctx.master.addons.add(sc)
sc.running()
assert len(tctx.master.addons) == 2
assert len(tctx.master.addons) == 1
tctx.master.options.update(
scripts=[tdata.path("mitmproxy/data/addonscripts/recorder/recorder.py")]
)
assert len(tctx.master.addons) == 2
assert len(tctx.master.addons) == 1
assert len(sc.addons) == 1
tctx.master.options.update(scripts=[])
assert len(tctx.master.addons) == 2
assert len(tctx.master.addons) == 1
assert len(sc.addons) == 0
def test_dupes(self):
@ -201,7 +203,8 @@ class TestScriptLoader:
with pytest.raises(exceptions.OptionsError):
tctx.configure(sc, scripts=["one", "one"])
async def test_script_deletion(self, tdata):
async def test_script_deletion(self, tdata, caplog_async):
caplog_async.caplog.set_level("INFO")
tdir = tdata.path("mitmproxy/data/addonscripts/")
with open(tdir + "/dummy.py", "w") as f:
f.write("\n")
@ -212,28 +215,29 @@ class TestScriptLoader:
tctx.configure(
sl, scripts=[tdata.path("mitmproxy/data/addonscripts/dummy.py")]
)
await tctx.master.await_log("Loading")
await caplog_async.await_log("Loading")
os.remove(tdata.path("mitmproxy/data/addonscripts/dummy.py"))
await tctx.master.await_log("Removing")
await caplog_async.await_log("Removing")
await asyncio.sleep(0.1)
assert not tctx.options.scripts
assert not sl.addons
async def test_script_error_handler(self):
async def test_script_error_handler(self, caplog):
path = "/sample/path/example.py"
exc = SyntaxError
msg = "Error raised"
tb = True
with taddons.context() as tctx:
with taddons.context():
script.script_error_handler(path, exc, msg, tb)
await tctx.master.await_log("/sample/path/example.py")
await tctx.master.await_log("Error raised")
await tctx.master.await_log("lineno")
await tctx.master.await_log("NoneType")
assert "/sample/path/example.py" in caplog.text
assert "Error raised" in caplog.text
assert "lineno" in caplog.text
assert "NoneType" in caplog.text
async def test_order(self, tdata):
async def test_order(self, tdata, caplog_async):
caplog_async.caplog.set_level("DEBUG")
rec = tdata.path("mitmproxy/data/addonscripts/recorder")
sc = script.ScriptLoader()
sc.is_running = True
@ -246,8 +250,8 @@ class TestScriptLoader:
"%s/c.py" % rec,
],
)
await tctx.master.await_log("configure")
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
await caplog_async.await_log("configure")
debug = [i.msg for i in caplog_async.caplog.records if i.levelname == "DEBUG"]
assert debug == [
"a load",
"a configure",
@ -260,7 +264,7 @@ class TestScriptLoader:
"c running",
]
tctx.master.clear()
caplog_async.caplog.clear()
tctx.configure(
sc,
scripts=[
@ -270,15 +274,15 @@ class TestScriptLoader:
],
)
await tctx.master.await_log("b configure")
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
await caplog_async.await_log("b configure")
debug = [i.msg for i in caplog_async.caplog.records if i.levelname == "DEBUG"]
assert debug == [
"c configure",
"a configure",
"b configure",
]
tctx.master.clear()
caplog_async.caplog.clear()
tctx.configure(
sc,
scripts=[
@ -286,8 +290,8 @@ class TestScriptLoader:
"%s/a.py" % rec,
],
)
await tctx.master.await_log("e configure")
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
await caplog_async.await_log("e configure")
debug = [i.msg for i in caplog_async.caplog.records if i.levelname == "DEBUG"]
assert debug == [
"c done",
"b done",
@ -312,12 +316,13 @@ def test_order(tdata, capsys):
tdata.path("mitmproxy/data/addonscripts/shutdown.py"),
]
)
time = r"\[[\d:.]+\] "
assert re.match(
r"Loading script.+recorder.py\n"
r"\('recorder', 'load', .+\n"
r"\('recorder', 'configure', .+\n"
r"Loading script.+shutdown.py\n"
r"\('recorder', 'running', .+\n"
r"\('recorder', 'done', .+\n$",
rf"{time}Loading script.+recorder.py\n"
rf"{time}\('recorder', 'load', .+\n"
rf"{time}\('recorder', 'configure', .+\n"
rf"{time}Loading script.+shutdown.py\n"
rf"{time}\('recorder', 'running', .+\n"
rf"{time}\('recorder', 'done', .+\n$",
capsys.readouterr().out,
)

View File

@ -1,14 +1,10 @@
from mitmproxy.addons.server_side_events import ServerSideEvents
from mitmproxy.test import taddons
from mitmproxy.test.tflow import tflow
async def test_simple():
async def test_simple(caplog):
s = ServerSideEvents()
with taddons.context() as tctx:
f = tflow(resp=True)
f.response.headers["content-type"] = "text/event-stream"
s.response(f)
await tctx.master.await_log(
"mitmproxy currently does not support server side events."
)
f = tflow(resp=True)
f.response.headers["content-type"] = "text/event-stream"
s.response(f)
assert "mitmproxy currently does not support server side events" in caplog.text

View File

@ -1,30 +1,57 @@
import asyncio
import io
import logging
import pytest
from mitmproxy import log
from mitmproxy.addons import termlog
from mitmproxy.test import taddons
from mitmproxy.utils import vt_codes
@pytest.fixture(autouse=True)
def ensure_cleanup():
yield
assert not any(
isinstance(x, termlog.TermLogHandler)
for x in logging.root.handlers
)
async def test_delayed_teardown():
t = termlog.TermLog()
t.done()
assert t.logger in logging.root.handlers
await asyncio.sleep(0)
assert t.logger not in logging.root.handlers
def test_output(capsys):
logging.getLogger().setLevel(logging.DEBUG)
t = termlog.TermLog()
with taddons.context(t) as tctx:
tctx.options.termlog_verbosity = "info"
tctx.configure(t)
t.add_log(log.LogEntry("one", "info"))
t.add_log(log.LogEntry("two", "debug"))
t.add_log(log.LogEntry("three", "warn"))
t.add_log(log.LogEntry("four", "error"))
logging.info("one")
logging.debug("two")
logging.warning("three")
logging.error("four")
out, err = capsys.readouterr()
assert out.strip().splitlines() == ["one", "three"]
assert err.strip().splitlines() == ["four"]
assert "one" in out
assert "two" not in out
assert "three" in out
assert "four" in out
t.done()
def test_styling(monkeypatch) -> None:
async def test_styling(monkeypatch) -> None:
monkeypatch.setattr(vt_codes, "ensure_supported", lambda _: True)
f = io.StringIO()
t = termlog.TermLog(out=f)
t.out_has_vt_codes = True
with taddons.context(t) as tctx:
tctx.configure(t)
t.add_log(log.LogEntry("hello world", "info"))
logging.warning("hello")
assert f.getvalue() == "\x1b[22mhello world\x1b[0m\n"
assert "\x1b[33m\x1b[22mhello\x1b[0m" in f.getvalue()
t.done()

View File

@ -360,11 +360,9 @@ class TestTlsConfig:
assert self.do_handshake(tssl_client, tssl_server)
assert tssl_server.obj.getpeercert()
async def test_ca_expired(self, monkeypatch):
async def test_ca_expired(self, monkeypatch, caplog):
monkeypatch.setattr(certs.Cert, "has_expired", lambda self: True)
ta = tlsconfig.TlsConfig()
with taddons.context(ta) as tctx:
with taddons.context(ta):
ta.configure(["confdir"])
await tctx.master.await_log(
"The mitmproxy certificate authority has expired", "warn"
)
assert "The mitmproxy certificate authority has expired" in caplog.text

View File

@ -249,24 +249,22 @@ def test_orders():
assert v.order_options()
async def test_load(tmpdir):
async def test_load(tmpdir, caplog):
path = str(tmpdir.join("path"))
v = view.View()
with taddons.context() as tctx:
tctx.master.addons.add(v)
tdump(path, [tflow.tflow(resp=True), tflow.tflow(resp=True)])
v.load_file(path)
assert len(v) == 2
v.load_file(path)
assert len(v) == 4
try:
v.load_file("nonexistent_file_path")
except OSError:
assert False
with open(path, "wb") as f:
f.write(b"invalidflows")
v.load_file(path)
await tctx.master.await_log("Invalid data format.")
tdump(path, [tflow.tflow(resp=True), tflow.tflow(resp=True)])
v.load_file(path)
assert len(v) == 2
v.load_file(path)
assert len(v) == 4
try:
v.load_file("nonexistent_file_path")
except OSError:
assert False
with open(path, "wb") as f:
f.write(b"invalidflows")
v.load_file(path)
assert "Invalid data format." in caplog.text
def test_resolve():

View File

@ -1,4 +1,4 @@
from mitmproxy import ctx
import logging
event_log = []
@ -9,7 +9,7 @@ class Addon:
return event_log
def load(self, opts):
ctx.log.info("addon running")
logging.info("addon running")
event_log.append("addonload")
def configure(self, updated):

View File

@ -1,8 +1,8 @@
from mitmproxy import ctx
import logging
def load(loader):
ctx.log.info("error load")
logging.info("error load")
def request(flow):

View File

@ -1,4 +1,5 @@
from mitmproxy import ctx
import logging
from mitmproxy import hooks
@ -13,10 +14,9 @@ class Recorder:
def prox(*args, **kwargs):
lg = (self.name, attr, args, kwargs)
if attr != "add_log":
ctx.log.info(str(lg))
self.call_log.append(lg)
ctx.log.debug(f"{self.name} {attr}")
logging.info(str(lg))
self.call_log.append(lg)
logging.debug(f"{self.name} {attr}")
return prox
raise AttributeError

View File

@ -1,4 +1,4 @@
from mitmproxy import ctx
import logging
def modify(chunks):
@ -7,7 +7,7 @@ def modify(chunks):
def running():
ctx.log.info("stream_modify running")
logging.info("stream_modify running")
def responseheaders(flow):

View File

@ -3,7 +3,6 @@ from typing import Optional
import pytest
from mitmproxy.connection import Address
from mitmproxy.net.udp import MAX_DATAGRAM_SIZE, DatagramReader, DatagramWriter, open_connection, start_server
from mitmproxy.test import taddons
async def test_client_server():
@ -57,26 +56,26 @@ async def test_client_server():
await server.wait_closed()
async def test_reader():
with taddons.context() as tctx:
reader = DatagramReader()
addr = ("8.8.8.8", 53)
reader.feed_data(b"First message", addr)
with pytest.raises(AssertionError):
reader.feed_data(bytearray(MAX_DATAGRAM_SIZE + 1), addr)
reader.feed_data(b"Second message", addr)
reader.feed_eof()
reader.feed_data(b"too late", ("1.2.3.4", 5))
await tctx.master.await_log("Received UDP packet from 1.2.3.4:5 after EOF")
assert await reader.read(65535) == b"First message"
with pytest.raises(AssertionError):
await reader.read(MAX_DATAGRAM_SIZE - 1)
assert await reader.read(65535) == b"Second message"
assert not await reader.read(65535)
assert not await reader.read(65535)
full_reader = DatagramReader()
for i in range(0, 42):
full_reader.feed_data(bytes([i]), addr)
full_reader.feed_data(b"too much", ("1.2.3.4", 5))
await tctx.master.await_log("Dropped UDP packet from 1.2.3.4:5")
full_reader.feed_eof()
async def test_reader(caplog_async):
caplog_async.caplog.set_level("DEBUG")
reader = DatagramReader()
addr = ("8.8.8.8", 53)
reader.feed_data(b"First message", addr)
with pytest.raises(AssertionError):
reader.feed_data(bytearray(MAX_DATAGRAM_SIZE + 1), addr)
reader.feed_data(b"Second message", addr)
reader.feed_eof()
reader.feed_data(b"too late", ("1.2.3.4", 5))
await caplog_async.await_log("Received UDP packet from 1.2.3.4:5 after EOF")
assert await reader.read(65535) == b"First message"
with pytest.raises(AssertionError):
await reader.read(MAX_DATAGRAM_SIZE - 1)
assert await reader.read(65535) == b"Second message"
assert not await reader.read(65535)
assert not await reader.read(65535)
full_reader = DatagramReader()
for i in range(0, 42):
full_reader.feed_data(bytes([i]), addr)
full_reader.feed_data(b"too much", ("1.2.3.4", 5))
await caplog_async.await_log("Dropped UDP packet from 1.2.3.4:5")
full_reader.feed_eof()

View File

@ -5,7 +5,6 @@ from hypothesis import settings
from mitmproxy import connection, options
from mitmproxy.addons.proxyserver import Proxyserver
from mitmproxy.addons.termlog import TermLog
from mitmproxy.proxy import context
@ -13,7 +12,6 @@ from mitmproxy.proxy import context
def tctx() -> context.Context:
opts = options.Options()
Proxyserver().load(opts)
TermLog().load(opts)
return context.Context(
connection.Client(("client", 1234), ("127.0.0.1", 8080), 1605699329), opts
)

Some files were not shown because too many files have changed in this diff Show More