Entirely remove the reply system.

The major, breaking change is that it is no longer possible to "take" a reply in
order to block the effect of a later addon hook.

This is patch 4/4 of the reply-ectomy.
This commit is contained in:
Robert Xiao 2022-02-03 06:12:24 -08:00 committed by Maximilian Hils
parent ede269fce4
commit 794c18bec0
30 changed files with 30 additions and 305 deletions

View File

@ -7,7 +7,6 @@ import types
import typing
from dataclasses import dataclass
from mitmproxy import controller
from mitmproxy import hooks
from mitmproxy import exceptions
from mitmproxy import flow
@ -231,26 +230,9 @@ class AddonManager:
Handle a lifecycle event.
"""
message = event.args()[0]
if not hasattr(message, "reply"): # pragma: no cover
raise exceptions.ControlException(
"Message %s has no reply attribute" % message
)
# We can use DummyReply objects multiple times. We only clear them up on
# the next handler so that we can access value and state in the
# meantime.
if isinstance(message.reply, controller.DummyReply):
message.reply.reset()
await self.trigger_event(event)
if message.reply.state == "start":
message.reply.take()
message.reply.commit()
if isinstance(message.reply, controller.DummyReply):
message.reply.mark_reset()
if isinstance(message, flow.Flow):
await self.trigger_event(hooks.UpdateHook([message]))

View File

@ -5,7 +5,6 @@ import urllib.parse
import asgiref.compatibility
import asgiref.wsgi
from mitmproxy import ctx, http
from mitmproxy.controller import DummyReply
class ASGIApp:
@ -26,18 +25,14 @@ class ASGIApp:
return f"asgiapp:{self.host}:{self.port}"
def should_serve(self, flow: http.HTTPFlow) -> bool:
assert flow.reply
return bool(
(flow.request.pretty_host, flow.request.port) == (self.host, self.port)
and flow.reply.state == "start" and not flow.error and not flow.response
and not isinstance(flow.reply, DummyReply) # ignore the HTTP flows of this app loaded from somewhere
and not flow.error and not flow.response
)
def request(self, flow: http.HTTPFlow) -> None:
assert flow.reply
async def request(self, flow: http.HTTPFlow) -> None:
if self.should_serve(flow):
flow.reply.take() # pause hook completion
asyncio.ensure_future(serve(self.asgi_app, flow))
await serve(self.asgi_app, flow)
class WSGIApp(ASGIApp):
@ -92,7 +87,6 @@ async def serve(app, flow: http.HTTPFlow):
"""
Serves app on flow.
"""
assert flow.reply
scope = make_scope(flow)
done = asyncio.Event()
@ -135,5 +129,4 @@ async def serve(app, flow: http.HTTPFlow):
ctx.log.error(f"Error in asgi app:\n{traceback.format_exc(limit=-5)}")
flow.response = http.Response.make(500, b"ASGI Error.")
finally:
flow.reply.commit()
done.set()

View File

@ -61,7 +61,7 @@ class BlockList:
self.items.append(spec)
def request(self, flow: http.HTTPFlow) -> None:
if flow.response or flow.error or (flow.reply and flow.reply.state == "taken"):
if flow.response or flow.error:
return
for spec in self.items:

View File

@ -10,7 +10,6 @@ from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import io
from mitmproxy.addons.proxyserver import AsyncReply
from mitmproxy.hooks import UpdateHook
from mitmproxy.net import server_spec
from mitmproxy.options import Options
@ -95,9 +94,7 @@ class ReplayHandler(server.ConnectionHandler):
async def handle_hook(self, hook: commands.StartHook) -> None:
data, = hook.args()
data.reply = AsyncReply(data)
await ctx.master.addons.handle_lifecycle(hook)
await data.reply.done.wait()
if isinstance(data, flow.Flow):
await data.wait_for_resume()
if isinstance(hook, (layers.http.HttpResponseHook, layers.http.HttpErrorHook)):

View File

@ -40,9 +40,6 @@ class Intercept:
def process_flow(self, f: flow.Flow) -> None:
if self.should_intercept(f):
assert f.reply
if f.reply.state != "start":
return ctx.log.debug("Cannot intercept request that is already taken by another addon.")
f.intercept()
# Handlers

View File

@ -105,7 +105,7 @@ class MapLocal:
self.replacements.append(spec)
def request(self, flow: http.HTTPFlow) -> None:
if flow.response or flow.error or (flow.reply and flow.reply.state == "taken"):
if flow.response or flow.error:
return
url = flow.request.pretty_url

View File

@ -48,7 +48,7 @@ class MapRemote:
self.replacements.append(spec)
def request(self, flow: http.HTTPFlow) -> None:
if flow.response or flow.error or (flow.reply and flow.reply.state == "taken"):
if flow.response or flow.error:
return
for spec in self.replacements:
if spec.matches(flow):

View File

@ -31,12 +31,12 @@ class ModifyBody:
self.replacements.append(spec)
def request(self, flow):
if flow.response or flow.error or flow.reply.state == "taken":
if flow.response or flow.error:
return
self.run(flow)
def response(self, flow):
if flow.error or flow.reply.state == "taken":
if flow.error:
return
self.run(flow)

View File

@ -73,12 +73,12 @@ class ModifyHeaders:
self.replacements.append(spec)
def request(self, flow):
if flow.response or flow.error or flow.reply.state == "taken":
if flow.response or flow.error:
return
self.run(flow, flow.request.headers)
def response(self, flow):
if flow.error or flow.reply.state == "taken":
if flow.error:
return
self.run(flow, flow.response.headers)

View File

@ -34,6 +34,6 @@ class Onboarding(asgiapp.WSGIApp):
self.port = ctx.options.onboarding_port
app.config["CONFDIR"] = ctx.options.confdir
def request(self, f):
async def request(self, f):
if ctx.options.onboarding:
super().request(f)
await super().request(f)

View File

@ -1,9 +1,8 @@
import asyncio
import warnings
from typing import Dict, Optional, Tuple
from mitmproxy import command, controller, ctx, exceptions, flow, http, log, master, options, platform, tcp, websocket
from mitmproxy.flow import Error, Flow
from mitmproxy import command, ctx, exceptions, flow, http, log, master, options, platform, tcp, websocket
from mitmproxy.flow import Flow
from mitmproxy.proxy import commands, events, server_hooks
from mitmproxy.proxy import server
from mitmproxy.proxy.layers.tcp import TcpMessageInjected
@ -12,29 +11,6 @@ from mitmproxy.utils import asyncio_utils, human
from wsproto.frame_protocol import Opcode
class AsyncReply(controller.Reply):
"""
controller.Reply.q.get() is blocking, which we definitely want to avoid in a coroutine.
This stub adds a .done asyncio.Event() that can be used instead.
"""
def __init__(self, *args):
self.done = asyncio.Event()
self.loop = asyncio.get_event_loop()
super().__init__(*args)
def commit(self):
super().commit()
try:
self.loop.call_soon_threadsafe(lambda: self.done.set())
except RuntimeError: # pragma: no cover
pass # event loop may already be closed.
def kill(self, force=False): # pragma: no cover
warnings.warn("reply.kill() is deprecated, set the error attribute instead.", DeprecationWarning, stacklevel=2)
self.obj.error = flow.Error(Error.KILLED_MESSAGE)
class ProxyConnectionHandler(server.StreamConnectionHandler):
master: master.Master
@ -47,16 +23,12 @@ class ProxyConnectionHandler(server.StreamConnectionHandler):
with self.timeout_watchdog.disarm():
# We currently only support single-argument hooks.
data, = hook.args()
data.reply = AsyncReply(data)
await self.master.addons.handle_lifecycle(hook)
await data.reply.done.wait()
data.reply = None
if isinstance(data, flow.Flow):
await data.wait_for_resume()
def log(self, message: str, level: str = "info") -> None:
x = log.LogEntry(self.log_prefix + message, level)
x.reply = controller.DummyReply() # type: ignore
asyncio_utils.create_task(
self.master.addons.handle_lifecycle(log.AddLogHook(x)),
name="ProxyConnectionHandler.log"

View File

@ -1,95 +0,0 @@
import asyncio
import warnings
from typing import Any
from mitmproxy import exceptions, flow
class Reply:
"""
Messages sent through a channel are decorated with a "reply" attribute. This
object is used to respond to the message through the return channel.
"""
def __init__(self, obj):
self.obj: Any = obj
self.done: asyncio.Event = asyncio.Event()
self._loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
self._state: str = "start" # "start" -> "taken" -> "committed"
@property
def state(self):
"""
The state the reply is currently in. A normal reply object goes
sequentially through the following lifecycle:
1. start: Initial State.
2. taken: The reply object has been taken to be committed.
3. committed: The reply has been sent back to the requesting party.
This attribute is read-only and can only be modified by calling one of
state transition functions.
"""
return self._state
def take(self):
"""
Scripts or other parties make "take" a reply out of a normal flow.
For example, intercepted flows are taken out so that the connection thread does not proceed.
"""
if self.state != "start":
raise exceptions.ControlException(f"Reply is {self.state}, but expected it to be start.")
self._state = "taken"
def commit(self):
"""
Ultimately, messages are committed. This is done either automatically by
the handler if the message is not taken or manually by the entity which
called .take().
"""
if self.state != "taken":
raise exceptions.ControlException(f"Reply is {self.state}, but expected it to be taken.")
self._state = "committed"
try:
self._loop.call_soon_threadsafe(lambda: self.done.set())
except RuntimeError: # pragma: no cover
pass # event loop may already be closed.
def kill(self, force=False): # pragma: no cover
warnings.warn("reply.kill() is deprecated, use flow.kill() or set the error attribute instead.",
DeprecationWarning, stacklevel=2)
self.obj.error = flow.Error(flow.Error.KILLED_MESSAGE)
def __del__(self):
if self.state != "committed":
# This will be ignored by the interpreter, but emit a warning
raise exceptions.ControlException(f"Uncommitted reply: {self.obj}")
def __deepcopy__(self, memo):
# some parts of the console ui may use deepcopy, see https://github.com/mitmproxy/mitmproxy/issues/4916
return memo.setdefault(id(self), DummyReply())
class DummyReply(Reply):
"""
A reply object that is not connected to anything. In contrast to regular
Reply objects, DummyReply objects are reset to "start" at the end of an
handler so that they can be used multiple times. Useful when we need an
object to seem like it has a channel, and during testing.
"""
def __init__(self):
super().__init__(None)
self._should_reset = False
def mark_reset(self):
if self.state != "committed":
raise exceptions.ControlException(f"Uncommitted reply: {self.obj}")
self._should_reset = True
def reset(self):
if self._should_reset:
self._state = "start"
def __del__(self):
pass

View File

@ -1,6 +1,5 @@
from typing import Any, Callable, Dict, Iterator, Type
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import hooks
from mitmproxy import http
@ -32,7 +31,6 @@ def _iterate_http(f: http.HTTPFlow) -> TEventGenerator:
def _iterate_tcp(f: tcp.TCPFlow) -> TEventGenerator:
messages = f.messages
f.messages = []
f.reply = controller.DummyReply()
yield layers.tcp.TcpStartHook(f)
while messages:
f.messages.append(messages.pop(0))

View File

@ -3,7 +3,7 @@ import time
import typing # noqa
import uuid
from mitmproxy import controller, connection
from mitmproxy import connection
from mitmproxy import exceptions
from mitmproxy import stateobject
from mitmproxy import version
@ -121,7 +121,6 @@ class Flow(stateobject.StateObject):
self.intercepted: bool = False
self._resume_event: typing.Optional[asyncio.Event] = None
self._backup: typing.Optional[Flow] = None
self.reply: typing.Optional[controller.Reply] = None
self.marked: str = ""
self.is_replay: typing.Optional[str] = None
self.metadata: typing.Dict[str, typing.Any] = dict()
@ -164,8 +163,6 @@ class Flow(stateobject.StateObject):
"""Make a copy of this flow."""
f = super().copy()
f.live = False
if self.reply is not None:
f.reply = controller.DummyReply()
return f
def modified(self):

View File

@ -6,7 +6,6 @@ import traceback
from mitmproxy import addonmanager, hooks
from mitmproxy import command
from mitmproxy import controller
from mitmproxy import eventsequence
from mitmproxy import http
from mitmproxy import log
@ -115,6 +114,5 @@ class Master:
if isinstance(f, http.HTTPFlow):
self._change_reverse_host(f)
f.reply = controller.DummyReply()
for e in eventsequence.iterate(f):
await self.addons.handle_lifecycle(e)

View File

@ -80,16 +80,15 @@ class context:
async def cycle(self, addon, f):
"""
Cycles the flow through the events for the flow. Stops if a reply
is taken (as in flow interception).
Cycles the flow through the events for the flow. Stops if the flow
is intercepted.
"""
f.reply._state = "start"
for evt in eventsequence.iterate(f):
await self.master.addons.invoke_addon(
addon,
evt
)
if f.reply.state == "taken":
if f.intercepted:
return
def configure(self, addon, **kwargs):

View File

@ -2,7 +2,6 @@ import uuid
from typing import Optional, Union
from mitmproxy import connection
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
@ -27,7 +26,6 @@ def ttcpflow(client_conn=True, server_conn=True, messages=True, err=None) -> tcp
f = tcp.TCPFlow(client_conn, server_conn)
f.messages = messages
f.error = err
f.reply = controller.DummyReply()
return f
@ -82,7 +80,6 @@ def twebsocketflow(messages=True, err=None, close_code=None, close_reason='') ->
# NORMAL_CLOSURE
flow.websocket.close_code = 1000
flow.reply = controller.DummyReply()
return flow
@ -119,7 +116,6 @@ def tflow(
f.response = resp or None
f.error = err or None
f.websocket = ws or None
f.reply = controller.DummyReply()
return f
@ -140,7 +136,6 @@ def tdummyflow(client_conn=True, server_conn=True, err=None) -> DummyFlow:
f = DummyFlow(client_conn, server_conn)
f.error = err
f.reply = controller.DummyReply()
return f
@ -166,7 +161,6 @@ def tclient_conn() -> connection.Client:
alpn_offers=[],
cipher_list=[],
))
c.reply = controller.DummyReply() # type: ignore
return c
@ -194,7 +188,6 @@ def tserver_conn() -> connection.Server:
cipher_list=[],
via2=None,
))
c.reply = controller.DummyReply() # type: ignore
return c

View File

@ -684,9 +684,7 @@ def format_flow(
error_message=error_message,
)
elif isinstance(f, HTTPFlow):
intercepted = (
f.intercepted and not (f.reply and f.reply.state == "committed")
)
intercepted = f.intercepted
response_content_length: typing.Optional[int]
if f.response:
if f.response.raw_content is not None:

View File

@ -52,7 +52,6 @@ exclude =
mitmproxy/connections.py
mitmproxy/contentviews/base.py
mitmproxy/contentviews/grpc.py
mitmproxy/controller.py
mitmproxy/ctx.py
mitmproxy/exceptions.py
mitmproxy/flow.py

View File

@ -25,7 +25,7 @@ def test_resume():
assert not sa.resume([f])
f.intercept()
sa.resume([f])
assert not f.reply.state == "taken"
assert not f.intercepted
def test_mark():

View File

@ -2,7 +2,6 @@ import pytest
from mitmproxy.addons import intercept
from mitmproxy import exceptions
from mitmproxy.proxy import layers
from mitmproxy.test import taddons
from mitmproxy.test import tflow
@ -59,19 +58,3 @@ async def test_tcp():
f = tflow.ttcpflow()
await tctx.cycle(r, f)
assert not f.intercepted
@pytest.mark.asyncio
async def test_already_taken():
r = intercept.Intercept()
with taddons.context(r) as tctx:
tctx.configure(r, intercept="~q")
f = tflow.tflow()
await tctx.invoke(r, layers.http.HttpRequestHook(f))
assert f.intercepted
f = tflow.tflow()
f.reply.take()
await tctx.invoke(r, layers.http.HttpRequestHook(f))
assert not f.intercepted

View File

@ -168,7 +168,7 @@ class TestMapLocal:
ml.request(f)
await tctx.master.await_log("could not read file")
def test_has_reply(self, tmpdir):
def test_is_killed(self, tmpdir):
ml = MapLocal()
with taddons.context(ml) as tctx:
tmpfile = tmpdir.join("foo.jpg")
@ -181,6 +181,6 @@ class TestMapLocal:
)
f = tflow.tflow()
f.request.url = b"https://example.org/images/foo.jpg"
f.reply.take()
f.kill()
ml.request(f)
assert not f.response

View File

@ -28,12 +28,12 @@ class TestMapRemote:
mr.request(f)
assert f.request.url == "https://mitmproxy.org/img/test.jpg"
def test_has_reply(self):
def test_is_killed(self):
mr = mapremote.MapRemote()
with taddons.context(mr) as tctx:
tctx.configure(mr, map_remote=[":example.org:mitmproxy.org"])
f = tflow.tflow()
f.request.url = b"https://example.org/images/test.jpg"
f.reply.take()
f.kill()
mr.request(f)
assert f.request.url == "https://example.org/images/test.jpg"

View File

@ -3,6 +3,7 @@ import pytest
from mitmproxy.addons import modifybody
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy.test.tutils import tresp
class TestModifyBody:
@ -41,14 +42,14 @@ class TestModifyBody:
f = tflow.tflow()
f.request.content = b"foo"
if take:
f.reply.take()
f.response = tresp()
mb.request(f)
assert (f.request.content == b"bar") ^ take
f = tflow.tflow(resp=True)
f.response.content = b"foo"
if take:
f.reply.take()
f.kill()
mb.response(f)
assert (f.response.content == b"bar") ^ take

View File

@ -3,6 +3,7 @@ import pytest
from mitmproxy.addons.modifyheaders import parse_modify_spec, ModifyHeaders
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy.test.tutils import tresp
def test_parse_modify_spec():
@ -133,13 +134,13 @@ class TestModifyHeaders:
tctx.configure(mh, modify_headers=["/content-length/42"])
f = tflow.tflow()
if take:
f.reply.take()
f.response = tresp()
mh.request(f)
assert (f.request.headers["content-length"] == "42") ^ take
f = tflow.tflow(resp=True)
if take:
f.reply.take()
f.kill()
mh.response(f)
assert (f.response.headers["content-length"] == "42") ^ take

View File

@ -32,7 +32,6 @@ class TestStickyCookie:
f.response.headers["set-cookie"] = "foo=bar"
sc.request(f)
f.reply.acked = False
sc.response(f)
assert sc.jar

View File

@ -3,15 +3,12 @@ import time
import pytest
from mitmproxy import controller
from mitmproxy.test import tflow
from mitmproxy.test import taddons
class Thing:
def __init__(self):
self.reply = controller.DummyReply()
self.live = True

View File

@ -1,80 +0,0 @@
import asyncio
import pytest
import mitmproxy.ctx
from mitmproxy import controller
from mitmproxy.exceptions import ControlException
from mitmproxy.test import taddons
@pytest.mark.asyncio
async def test_master():
class tAddon:
def add_log(self, _):
mitmproxy.ctx.master.should_exit.set()
with taddons.context(tAddon()) as tctx:
assert not tctx.master.should_exit.is_set()
async def test():
mitmproxy.ctx.log("test")
asyncio.ensure_future(test())
await tctx.master.await_log("test")
assert tctx.master.should_exit.is_set()
class TestReply:
@pytest.mark.asyncio
async def test_simple(self):
reply = controller.Reply(42)
assert reply.state == "start"
reply.take()
assert reply.state == "taken"
assert not reply.done.is_set()
reply.commit()
assert reply.state == "committed"
assert await asyncio.wait_for(reply.done.wait(), 1)
def test_double_commit(self):
reply = controller.Reply(47)
reply.take()
reply.commit()
with pytest.raises(ControlException):
reply.commit()
def test_del(self):
reply = controller.Reply(47)
with pytest.raises(ControlException):
reply.__del__()
reply.take()
reply.commit()
class TestDummyReply:
def test_simple(self):
reply = controller.DummyReply()
for _ in range(2):
reply.take()
reply.commit()
reply.mark_reset()
reply.reset()
assert reply.state == "start"
def test_reset(self):
reply = controller.DummyReply()
reply.take()
with pytest.raises(ControlException):
reply.mark_reset()
reply.commit()
reply.mark_reset()
assert reply.state == "committed"
reply.reset()
assert reply.state == "start"
def test_del(self):
reply = controller.DummyReply()
reply.__del__()

View File

@ -2,7 +2,6 @@ from unittest import mock
import pytest
from mitmproxy import controller
from mitmproxy import log
from mitmproxy import options
from mitmproxy.tools import dump
@ -17,7 +16,6 @@ class TestDumpMaster:
def test_has_error(self):
m = self.mkmaster()
ent = log.LogEntry("foo", "error")
ent.reply = controller.DummyReply()
m.addons.trigger(log.AddLogHook(ent))
assert m.errorcheck.has_errored

View File

@ -1,6 +1,5 @@
from unittest import mock
from mitmproxy import controller
from mitmproxy import eventsequence
from mitmproxy import io
from mitmproxy.proxy import server_hooks
@ -14,7 +13,6 @@ class MasterTest:
f = tflow.tflow(req=tutils.treq(content=content))
layer = mock.Mock("mitmproxy.proxy.protocol.base.Layer")
layer.client_conn = f.client_conn
layer.reply = controller.DummyReply()
await master.addons.handle_lifecycle(server_hooks.ClientConnectedHook(layer))
for e in eventsequence.iterate(f):
await master.addons.handle_lifecycle(e)