simplify tests for @concurrent

This commit is contained in:
Maximilian Hils 2016-03-18 20:36:57 +01:00
parent 7e49b8c186
commit fb0b17ee93
3 changed files with 33 additions and 89 deletions

View File

@ -0,0 +1,32 @@
from threading import Event
from mitmproxy.script import Script
from test.mitmproxy import tutils
class Dummy:
def __init__(self, reply):
self.reply = reply
@tutils.skip_appveyor
def test_concurrent():
with Script(tutils.test_data.path("scripts/concurrent_decorator.py"), None) as s:
def reply():
reply.acked.set()
reply.acked = Event()
f1, f2 = Dummy(reply), Dummy(reply)
s.run("request", f1)
f1.reply()
s.run("request", f2)
f2.reply()
assert f1.reply.acked == reply.acked
assert not reply.acked.is_set()
assert reply.acked.wait(10)
def test_concurrent_err():
s = Script(tutils.test_data.path("scripts/concurrent_decorator_err.py"), None)
with tutils.raises("Concurrent decorator not supported for 'start' method"):
s.load()

View File

@ -1,32 +1,6 @@
import time
from mitmproxy.script import concurrent
@concurrent
def clientconnect(context, cc):
context.log("clientconnect")
@concurrent
def serverconnect(context, sc):
context.log("serverconnect")
@concurrent
def request(context, flow):
time.sleep(0.1)
@concurrent
def response(context, flow):
context.log("response")
@concurrent
def error(context, err):
context.log("error")
@concurrent
def clientdisconnect(context, dc):
context.log("clientdisconnect")

View File

@ -1,6 +1,4 @@
import time
import mock
from mitmproxy import script, flow
from mitmproxy import flow
from . import tutils
@ -13,63 +11,3 @@ def test_duplicate_flow():
assert fm.state.flow_count() == 2
assert not fm.state.view[0].request.is_replay
assert fm.state.view[1].request.is_replay
@tutils.skip_appveyor
def test_concurrent():
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py"))
with mock.patch("mitmproxy.controller.DummyReply.__call__") as m:
f1, f2 = tutils.tflow(), tutils.tflow()
t_start = time.time()
fm.handle_request(f1)
f1.reply()
fm.handle_request(f2)
f2.reply()
# Two instantiations
assert m.call_count == 0 # No calls yet.
assert (time.time() - t_start) < 0.1
def test_concurrent2():
s = flow.State()
fm = flow.FlowMaster(None, s)
s = script.Script(
tutils.test_data.path("scripts/concurrent_decorator.py"),
script.ScriptContext(fm))
s.load()
m = mock.Mock()
class Dummy:
def __init__(self):
self.response = self
self.error = self
self.reply = m
t_start = time.time()
for hook in ("clientconnect",
"serverconnect",
"response",
"error",
"clientconnect"):
d = Dummy()
s.run(hook, d)
d.reply()
while (time.time() - t_start) < 20 and m.call_count <= 5:
if m.call_count == 5:
return
time.sleep(0.001)
assert False
def test_concurrent_err():
s = flow.State()
fm = flow.FlowMaster(None, s)
with tutils.raises("Concurrent decorator not supported for 'start' method"):
s = script.Script(tutils.test_data.path("scripts/concurrent_decorator_err.py"), fm)
s.load()