mitmproxy/examples/contrib/test_jsondump.py
2022-11-29 13:28:41 +00:00

73 lines
2.5 KiB
Python

import base64
import json
import requests_mock
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy.test import tutils
example_dir = tutils.test_data.push("../examples")
class TestJSONDump:
def echo_response(self, request, context):
self.request = {"json": request.json(), "headers": request.headers}
return ""
def flow(self, resp_content=b"message"):
times = dict(
timestamp_start=746203272,
timestamp_end=746203272,
)
# Create a dummy flow for testing
return tflow.tflow(
req=tutils.treq(method=b"GET", **times),
resp=tutils.tresp(content=resp_content, **times),
)
def test_simple(self, tmpdir):
with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/jsondump.py"))
path = str(tmpdir.join("jsondump.out"))
tctx.configure(a, dump_destination=path)
tctx.invoke(a, "response", self.flow())
tctx.invoke(a, "done")
with open(path) as inp:
entry = json.loads(inp.readline())
assert entry["response"]["content"] == "message"
def test_contentencode(self, tmpdir):
with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/jsondump.py"))
path = str(tmpdir.join("jsondump.out"))
content = b"foo" + b"\xFF" * 10
tctx.configure(a, dump_destination=path, dump_encodecontent=True)
tctx.invoke(a, "response", self.flow(resp_content=content))
tctx.invoke(a, "done")
with open(path) as inp:
entry = json.loads(inp.readline())
assert entry["response"]["content"] == base64.b64encode(content).decode(
"utf-8"
)
def test_http(self, tmpdir):
with requests_mock.Mocker() as mock:
mock.post("http://my-server", text=self.echo_response)
with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/jsondump.py"))
tctx.configure(
a,
dump_destination="http://my-server",
dump_username="user",
dump_password="pass",
)
tctx.invoke(a, "response", self.flow())
tctx.invoke(a, "done")
assert self.request["json"]["response"]["content"] == "message"
assert self.request["headers"]["Authorization"] == "Basic dXNlcjpwYXNz"