mirror of
https://github.com/mitmproxy/mitmproxy.git
synced 2024-11-23 21:30:04 +00:00
82 lines
3.2 KiB
Python
82 lines
3.2 KiB
Python
import logging
|
|
import pathlib
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import mitmproxy.connections
|
|
import mitmproxy.http
|
|
from mitmproxy.addons.export import curl_command
|
|
from mitmproxy.addons.export import raw
|
|
from mitmproxy.exceptions import HttpSyntaxException
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WatchdogAddon:
|
|
"""The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device
|
|
under test responds correctls to the scanner's responses.
|
|
|
|
The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses.
|
|
If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set.
|
|
This information can be used to restart the device under test if necessary.
|
|
"""
|
|
|
|
def __init__(self, event, outdir: pathlib.Path, timeout=None):
|
|
"""Initializes the Watchdog.
|
|
|
|
Args:
|
|
event: multiprocessing.Event that will be set if the watchdog is triggered.
|
|
outdir: path to a directory in which the triggering requests will be saved (curl and raw).
|
|
timeout_conn: float that specifies the timeout for the server connection
|
|
"""
|
|
self.error_event = event
|
|
self.flow_dir = outdir
|
|
if self.flow_dir.exists() and not self.flow_dir.is_dir():
|
|
raise RuntimeError("Watchtdog output path must be a directory.")
|
|
elif not self.flow_dir.exists():
|
|
self.flow_dir.mkdir(parents=True)
|
|
self.last_trigger: None | float = None
|
|
self.timeout: None | float = timeout
|
|
|
|
def serverconnect(self, conn: mitmproxy.connections.ServerConnection):
|
|
if self.timeout is not None:
|
|
conn.settimeout(self.timeout)
|
|
|
|
@classmethod
|
|
def not_in_timeout(cls, last_triggered, timeout):
|
|
"""Checks if current error lies not in timeout after last trigger (potential reset of connection)."""
|
|
return (
|
|
last_triggered is None
|
|
or timeout is None
|
|
or (time.time() - last_triggered > timeout)
|
|
)
|
|
|
|
def error(self, flow):
|
|
"""Checks if the watchdog will be triggered.
|
|
|
|
Only triggers watchdog for timeouts after last reset and if flow.error is set (shows that error is a server
|
|
error). Ignores HttpSyntaxException Errors since this can be triggered on purpose by web application scanner.
|
|
|
|
Args:
|
|
flow: mitmproxy.http.flow
|
|
"""
|
|
if (
|
|
self.not_in_timeout(self.last_trigger, self.timeout)
|
|
and flow.error is not None
|
|
and not isinstance(flow.error, HttpSyntaxException)
|
|
):
|
|
self.last_trigger = time.time()
|
|
logger.error(f"Watchdog triggered! Cause: {flow}")
|
|
self.error_event.set()
|
|
|
|
# save the request which might have caused the problem
|
|
if flow.request:
|
|
with (self.flow_dir / f"{datetime.utcnow().isoformat()}.curl").open(
|
|
"w"
|
|
) as f:
|
|
f.write(curl_command(flow))
|
|
with (self.flow_dir / f"{datetime.utcnow().isoformat()}.raw").open(
|
|
"wb"
|
|
) as f:
|
|
f.write(raw(flow))
|