Merge pull request #1709 from cortesi/taddons4

addons.view: test suite to 100%
This commit is contained in:
Aldo Cortesi 2016-11-04 09:33:48 +13:00 committed by GitHub
commit d51b8933b2
9 changed files with 208 additions and 172 deletions

View File

@ -14,13 +14,15 @@ class SetHeaders:
header: Header name.
value: Header value string
"""
for fpatt, header, value in options.setheaders:
flt = flowfilter.parse(fpatt)
if not flt:
raise exceptions.OptionsError(
"Invalid setheader filter pattern %s" % fpatt
)
self.lst.append((fpatt, header, value, flt))
if "setheaders" in updated:
self.lst = []
for fpatt, header, value in options.setheaders:
flt = flowfilter.parse(fpatt)
if not flt:
raise exceptions.OptionsError(
"Invalid setheader filter pattern %s" % fpatt
)
self.lst.append((fpatt, header, value, flt))
def run(self, f, hdrs):
for _, header, value, flt in self.lst:

View File

@ -51,13 +51,16 @@ class _OrderKey:
return "_order_%s" % id(self)
def __call__(self, f):
k = self._key()
s = self.view.settings[f]
if k in s:
return s[k]
val = self.generate(f)
s[k] = val
return val
if f.id in self.view._store:
k = self._key()
s = self.view.settings[f]
if k in s:
return s[k]
val = self.generate(f)
s[k] = val
return val
else:
return self.generate(f)
class OrderRequestStart(_OrderKey):
@ -159,11 +162,8 @@ class View(collections.Sequence):
# Reflect some methods to the efficient underlying implementation
def bisect(self, f: mitmproxy.flow.Flow) -> int:
v = self._view.bisect(f)
# Bisect returns an item to the RIGHT of the existing entries.
if v == 0:
return v
def _bisect(self, f: mitmproxy.flow.Flow) -> int:
v = self._view.bisect_right(f)
return self._rev(v - 1) + 1
def index(self, f: mitmproxy.flow.Flow, start: int = 0, stop: typing.Optional[int] = None) -> int:
@ -349,7 +349,7 @@ class Focus:
self.flow = self.view[idx]
def _nearest(self, f, v):
return min(v.bisect(f), len(v) - 1)
return min(v._bisect(f), len(v) - 1)
def _sig_remove(self, view, flow):
if len(view) == 0:

View File

@ -1,11 +1,10 @@
from mitmproxy.test import tflow
from .. import mastertest
from mitmproxy.addons import anticache
from mitmproxy.test import taddons
class TestAntiCache(mastertest.MasterTest):
class TestAntiCache:
def test_simple(self):
sa = anticache.AntiCache()
with taddons.context() as tctx:

View File

@ -1,11 +1,10 @@
from mitmproxy.test import tflow
from .. import mastertest
from mitmproxy.addons import anticomp
from mitmproxy.test import taddons
class TestAntiComp(mastertest.MasterTest):
class TestAntiComp:
def test_simple(self):
sa = anticomp.AntiComp()
with taddons.context() as tctx:

View File

@ -1,14 +1,14 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .. import mastertest, tservers
from .. import tservers
from mitmproxy.addons import replace
from mitmproxy import master
from mitmproxy import options
from mitmproxy import proxy
class TestReplace(mastertest.MasterTest):
class TestReplace:
def test_configure(self):
r = replace.Replace()
updated = set(["replacements"])

View File

@ -1,21 +1,12 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .. import mastertest
from mitmproxy.test import taddons
from mitmproxy.addons import setheaders
from mitmproxy import options
from mitmproxy import proxy
class TestSetHeaders(mastertest.MasterTest):
def mkmaster(self, **opts):
o = options.Options(**opts)
m = mastertest.RecordingMaster(o, proxy.DummyServer())
sh = setheaders.SetHeaders()
m.addons.add(sh)
return m, sh
class TestSetHeaders:
def test_configure(self):
sh = setheaders.SetHeaders()
o = options.Options(
@ -27,41 +18,46 @@ class TestSetHeaders(mastertest.MasterTest):
)
def test_setheaders(self):
m, sh = self.mkmaster(
setheaders = [
("~q", "one", "two"),
("~s", "one", "three")
]
)
f = tflow.tflow()
f.request.headers["one"] = "xxx"
m.request(f)
assert f.request.headers["one"] == "two"
sh = setheaders.SetHeaders()
with taddons.context() as tctx:
tctx.configure(
sh,
setheaders = [
("~q", "one", "two"),
("~s", "one", "three")
]
)
f = tflow.tflow()
f.request.headers["one"] = "xxx"
sh.request(f)
assert f.request.headers["one"] == "two"
f = tflow.tflow(resp=True)
f.response.headers["one"] = "xxx"
m.response(f)
assert f.response.headers["one"] == "three"
f = tflow.tflow(resp=True)
f.response.headers["one"] = "xxx"
sh.response(f)
assert f.response.headers["one"] == "three"
m, sh = self.mkmaster(
setheaders = [
("~s", "one", "two"),
("~s", "one", "three")
]
)
f = tflow.tflow(resp=True)
f.request.headers["one"] = "xxx"
f.response.headers["one"] = "xxx"
m.response(f)
assert f.response.headers.get_all("one") == ["two", "three"]
tctx.configure(
sh,
setheaders = [
("~s", "one", "two"),
("~s", "one", "three")
]
)
f = tflow.tflow(resp=True)
f.request.headers["one"] = "xxx"
f.response.headers["one"] = "xxx"
sh.response(f)
assert f.response.headers.get_all("one") == ["two", "three"]
m, sh = self.mkmaster(
setheaders = [
("~q", "one", "two"),
("~q", "one", "three")
]
)
f = tflow.tflow()
f.request.headers["one"] = "xxx"
m.request(f)
assert f.request.headers.get_all("one") == ["two", "three"]
tctx.configure(
sh,
setheaders = [
("~q", "one", "two"),
("~q", "one", "three")
]
)
f = tflow.tflow()
f.request.headers["one"] = "xxx"
sh.request(f)
assert f.request.headers.get_all("one") == ["two", "three"]

View File

@ -1,11 +1,9 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
from .. import mastertest
from mitmproxy.addons import stickycookie
from mitmproxy import master
from mitmproxy import options
from mitmproxy import proxy
from mitmproxy.test import tutils as ntutils
@ -14,14 +12,7 @@ def test_domain_match():
assert stickycookie.domain_match("google.com", ".google.com")
class TestStickyCookie(mastertest.MasterTest):
def mk(self):
o = options.Options(stickycookie = ".*")
m = master.Master(o, proxy.DummyServer())
sc = stickycookie.StickyCookie()
m.addons.add(sc)
return m, sc
class TestStickyCookie:
def test_config(self):
sc = stickycookie.StickyCookie()
o = options.Options(stickycookie = "~b")
@ -31,103 +22,113 @@ class TestStickyCookie(mastertest.MasterTest):
)
def test_simple(self):
m, sc = self.mk()
m.addons.add(sc)
sc = stickycookie.StickyCookie()
with taddons.context() as tctx:
tctx.configure(sc, stickycookie=".*")
f = tflow.tflow(resp=True)
f.response.headers["set-cookie"] = "foo=bar"
sc.request(f)
f = tflow.tflow(resp=True)
f.response.headers["set-cookie"] = "foo=bar"
m.request(f)
f.reply.acked = False
sc.response(f)
f.reply.acked = False
m.response(f)
assert sc.jar
assert "cookie" not in f.request.headers
assert sc.jar
assert "cookie" not in f.request.headers
f = f.copy()
f.reply.acked = False
sc.request(f)
assert f.request.headers["cookie"] == "foo=bar"
f = f.copy()
f.reply.acked = False
m.request(f)
assert f.request.headers["cookie"] == "foo=bar"
def _response(self, m, sc, cookie, host):
def _response(self, sc, cookie, host):
f = tflow.tflow(req=ntutils.treq(host=host, port=80), resp=True)
f.response.headers["Set-Cookie"] = cookie
m.response(f)
sc.response(f)
return f
def test_response(self):
m, sc = self.mk()
sc = stickycookie.StickyCookie()
with taddons.context() as tctx:
tctx.configure(sc, stickycookie=".*")
c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; " \
"Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; "
c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; " \
"Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; "
self._response(m, sc, c, "host")
assert not sc.jar.keys()
self._response(sc, c, "host")
assert not sc.jar.keys()
self._response(m, sc, c, "www.google.com")
assert sc.jar.keys()
self._response(sc, c, "www.google.com")
assert sc.jar.keys()
sc.jar.clear()
self._response(
m, sc, "SSID=mooo", "www.google.com"
)
assert list(sc.jar.keys())[0] == ('www.google.com', 80, '/')
sc.jar.clear()
self._response(sc, "SSID=mooo", "www.google.com")
assert list(sc.jar.keys())[0] == ('www.google.com', 80, '/')
def test_response_multiple(self):
m, sc = self.mk()
sc = stickycookie.StickyCookie()
with taddons.context() as tctx:
tctx.configure(sc, stickycookie=".*")
# Test setting of multiple cookies
c1 = "somecookie=test; Path=/"
c2 = "othercookie=helloworld; Path=/"
f = self._response(m, sc, c1, "www.google.com")
f.response.headers["Set-Cookie"] = c2
m.response(f)
googlekey = list(sc.jar.keys())[0]
assert len(sc.jar[googlekey].keys()) == 2
# Test setting of multiple cookies
c1 = "somecookie=test; Path=/"
c2 = "othercookie=helloworld; Path=/"
f = self._response(sc, c1, "www.google.com")
f.response.headers["Set-Cookie"] = c2
sc.response(f)
googlekey = list(sc.jar.keys())[0]
assert len(sc.jar[googlekey].keys()) == 2
def test_response_weird(self):
m, sc = self.mk()
sc = stickycookie.StickyCookie()
with taddons.context() as tctx:
tctx.configure(sc, stickycookie=".*")
# Test setting of weird cookie keys
f = tflow.tflow(req=ntutils.treq(host="www.google.com", port=80), resp=True)
cs = [
"foo/bar=hello",
"foo:bar=world",
"foo@bar=fizz",
]
for c in cs:
f.response.headers["Set-Cookie"] = c
m.response(f)
googlekey = list(sc.jar.keys())[0]
assert len(sc.jar[googlekey].keys()) == len(cs)
# Test setting of weird cookie keys
f = tflow.tflow(req=ntutils.treq(host="www.google.com", port=80), resp=True)
cs = [
"foo/bar=hello",
"foo:bar=world",
"foo@bar=fizz",
]
for c in cs:
f.response.headers["Set-Cookie"] = c
sc.response(f)
googlekey = list(sc.jar.keys())[0]
assert len(sc.jar[googlekey].keys()) == len(cs)
def test_response_overwrite(self):
m, sc = self.mk()
sc = stickycookie.StickyCookie()
with taddons.context() as tctx:
tctx.configure(sc, stickycookie=".*")
# Test overwriting of a cookie value
c1 = "somecookie=helloworld; Path=/"
c2 = "somecookie=newvalue; Path=/"
f = self._response(m, sc, c1, "www.google.com")
f.response.headers["Set-Cookie"] = c2
m.response(f)
googlekey = list(sc.jar.keys())[0]
assert len(sc.jar[googlekey].keys()) == 1
assert list(sc.jar[googlekey]["somecookie"].items())[0][1] == "newvalue"
# Test overwriting of a cookie value
c1 = "somecookie=helloworld; Path=/"
c2 = "somecookie=newvalue; Path=/"
f = self._response(sc, c1, "www.google.com")
f.response.headers["Set-Cookie"] = c2
sc.response(f)
googlekey = list(sc.jar.keys())[0]
assert len(sc.jar[googlekey].keys()) == 1
assert list(sc.jar[googlekey]["somecookie"].items())[0][1] == "newvalue"
def test_response_delete(self):
m, sc = self.mk()
sc = stickycookie.StickyCookie()
with taddons.context() as tctx:
tctx.configure(sc, stickycookie=".*")
# Test that a cookie is be deleted
# by setting the expire time in the past
f = self._response(m, sc, "duffer=zafar; Path=/", "www.google.com")
f.response.headers["Set-Cookie"] = "duffer=; Expires=Thu, 01-Jan-1970 00:00:00 GMT"
m.response(f)
assert not sc.jar.keys()
# Test that a cookie is be deleted
# by setting the expire time in the past
f = self._response(sc, "duffer=zafar; Path=/", "www.google.com")
f.response.headers["Set-Cookie"] = "duffer=; Expires=Thu, 01-Jan-1970 00:00:00 GMT"
sc.response(f)
assert not sc.jar.keys()
def test_request(self):
m, sc = self.mk()
sc = stickycookie.StickyCookie()
with taddons.context() as tctx:
tctx.configure(sc, stickycookie=".*")
f = self._response(m, sc, "SSID=mooo", "www.google.com")
assert "cookie" not in f.request.headers
m.request(f)
assert "cookie" in f.request.headers
f = self._response(sc, "SSID=mooo", "www.google.com")
assert "cookie" not in f.request.headers
sc.request(f)
assert "cookie" in f.request.headers

View File

@ -1,4 +1,3 @@
from .. import mastertest
import io
from mitmproxy.addons import termlog
@ -6,7 +5,7 @@ from mitmproxy import log
from mitmproxy.tools import dump
class TestTermLog(mastertest.MasterTest):
class TestTermLog:
def test_simple(self):
t = termlog.TermLog()
sio = io.StringIO()

View File

@ -7,6 +7,13 @@ from mitmproxy import options
from mitmproxy.test import taddons
def tft(*, method="get", start=0):
f = tflow.tflow()
f.request.method = method
f.request.timestamp_start = start
return f
class Options(options.Options):
def __init__(
self,
@ -62,18 +69,24 @@ def test_order_generators():
def test_simple():
v = view.View()
f = tflow.tflow()
f = tft(start=1)
assert v.store_count() == 0
f.request.timestamp_start = 1
v.request(f)
assert list(v) == [f]
# These all just call udpate
v.error(f)
v.response(f)
v.intercept(f)
v.resume(f)
assert list(v) == [f]
v.request(f)
assert list(v) == [f]
assert len(v._store) == 1
assert v.store_count() == 1
f2 = tflow.tflow()
f2.request.timestamp_start = 3
f2 = tft(start=3)
v.request(f2)
assert list(v) == [f, f2]
v.request(f2)
@ -84,8 +97,7 @@ def test_simple():
assert not v.inbounds(-1)
assert not v.inbounds(100)
f3 = tflow.tflow()
f3.request.timestamp_start = 2
f3 = tft(start=2)
v.request(f3)
assert list(v) == [f, f3, f2]
v.request(f3)
@ -97,13 +109,6 @@ def test_simple():
assert len(v._store) == 0
def tft(*, method="get", start=0):
f = tflow.tflow()
f.request.method = method
f.request.timestamp_start = start
return f
def test_filter():
v = view.View()
f = flowfilter.parse("~m get")
@ -160,8 +165,8 @@ def test_reversed():
tutils.raises(IndexError, v.__getitem__, 5)
tutils.raises(IndexError, v.__getitem__, -5)
assert v.bisect(v[0]) == 1
assert v.bisect(v[2]) == 3
assert v._bisect(v[0]) == 1
assert v._bisect(v[2]) == 3
def test_update():
@ -255,6 +260,33 @@ def test_signals():
assert not any([rec_add, rec_update, rec_remove, rec_refresh])
def test_focus_follow():
v = view.View()
with taddons.context(options=Options()) as tctx:
tctx.configure(v, focus_follow=True, filter="~m get")
v.add(tft(start=5))
assert v.focus.index == 0
v.add(tft(start=4))
assert v.focus.index == 0
assert v.focus.flow.request.timestamp_start == 4
v.add(tft(start=7))
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7
mod = tft(method="put", start=6)
v.add(mod)
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7
mod.request.method = "GET"
v.update(mod)
assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 6
def test_focus():
# Special case - initialising with a view that already contains data
v = view.View()
@ -273,6 +305,10 @@ def test_focus():
assert f.index == 0
assert f.flow is v[0]
# Try to set to something not in view
tutils.raises(ValueError, f.__setattr__, "flow", tft())
tutils.raises(ValueError, f.__setattr__, "index", 99)
v.add(tft(start=0))
assert f.index == 1
assert f.flow is v[1]
@ -281,6 +317,10 @@ def test_focus():
assert f.index == 1
assert f.flow is v[1]
f.index = 0
assert f.index == 0
f.index = 1
v.remove(v[1])
assert f.index == 1
assert f.flow is v[1]