mitmproxy/test/tservers.py

295 lines
7.9 KiB
Python

import threading, Queue
import flask
import libpathod.test, libpathod.pathoc
from libmproxy import proxy, flow, controller
from libmproxy.cmdline import APP_HOST, APP_PORT
import tutils
testapp = flask.Flask(__name__)
@testapp.route("/")
def hello():
return "testapp"
@testapp.route("/error")
def error():
raise ValueError("An exception...")
def errapp(environ, start_response):
raise ValueError("errapp")
class TestMaster(flow.FlowMaster):
def __init__(self, config):
s = proxy.ProxyServer(config, 0)
state = flow.State()
flow.FlowMaster.__init__(self, s, state)
self.apps.add(testapp, "testapp", 80)
self.apps.add(errapp, "errapp", 80)
self.clear_log()
self.start_app(APP_HOST, APP_PORT, False)
def handle_request(self, m):
flow.FlowMaster.handle_request(self, m)
m.reply()
def handle_response(self, m):
flow.FlowMaster.handle_response(self, m)
m.reply()
def clear_log(self):
self.log = []
def handle_log(self, l):
self.log.append(l.msg)
l.reply()
class ProxyThread(threading.Thread):
def __init__(self, tmaster):
threading.Thread.__init__(self)
self.tmaster = tmaster
self.name = "ProxyThread (%s:%s)" % (tmaster.server.address.host, tmaster.server.address.port)
controller.should_exit = False
@property
def port(self):
return self.tmaster.server.address.port
@property
def log(self):
return self.tmaster.log
def run(self):
self.tmaster.run()
def shutdown(self):
self.tmaster.shutdown()
class ProxTestBase(object):
# Test Configuration
ssl = None
ssloptions = False
clientcerts = False
certfile = None
no_upstream_cert = False
authenticator = None
masterclass = TestMaster
@classmethod
def setupAll(cls):
cls.server = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
cls.server2 = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
pconf = cls.get_proxy_config()
config = proxy.ProxyConfig(
no_upstream_cert = cls.no_upstream_cert,
cacert = tutils.test_data.path("data/confdir/mitmproxy-ca.pem"),
authenticator = cls.authenticator,
**pconf
)
tmaster = cls.masterclass(config)
cls.proxy = ProxyThread(tmaster)
cls.proxy.start()
@property
def master(cls):
return cls.proxy.tmaster
@classmethod
def teardownAll(cls):
cls.proxy.shutdown()
cls.server.shutdown()
cls.server2.shutdown()
def setUp(self):
self.master.clear_log()
self.master.state.clear()
self.server.clear_log()
self.server2.clear_log()
@property
def scheme(self):
return "https" if self.ssl else "http"
@property
def proxies(self):
"""
The URL base for the server instance.
"""
return (
(self.scheme, ("127.0.0.1", self.proxy.port))
)
@classmethod
def get_proxy_config(cls):
d = dict()
if cls.clientcerts:
d["clientcerts"] = tutils.test_data.path("data/clientcert")
if cls.certfile:
d["certfile"] =tutils.test_data.path("data/testkey.pem")
return d
class HTTPProxTest(ProxTestBase):
def pathoc_raw(self):
return libpathod.pathoc.Pathoc(("127.0.0.1", self.proxy.port))
def pathoc(self, sni=None):
"""
Returns a connected Pathoc instance.
"""
p = libpathod.pathoc.Pathoc(("localhost", self.proxy.port), ssl=self.ssl, sni=sni)
if self.ssl:
p.connect(("127.0.0.1", self.server.port))
else:
p.connect()
return p
def pathod(self, spec, sni=None):
"""
Constructs a pathod GET request, with the appropriate base and proxy.
"""
p = self.pathoc(sni=sni)
spec = spec.encode("string_escape")
if self.ssl:
q = "get:'/p/%s'"%spec
else:
q = "get:'%s/p/%s'"%(self.server.urlbase, spec)
return p.request(q)
def app(self, page):
if self.ssl:
p = libpathod.pathoc.Pathoc(("127.0.0.1", self.proxy.port), True)
p.connect((APP_HOST, APP_PORT))
return p.request("get:'/%s'"%page)
else:
p = self.pathoc()
return p.request("get:'http://%s/%s'"%(APP_HOST, page))
class TResolver:
def __init__(self, port):
self.port = port
def original_addr(self, sock):
return ("127.0.0.1", self.port)
class TransparentProxTest(ProxTestBase):
ssl = None
resolver = TResolver
@classmethod
def get_proxy_config(cls):
d = ProxTestBase.get_proxy_config()
if cls.ssl:
ports = [cls.server.port, cls.server2.port]
else:
ports = []
d["transparent_proxy"] = dict(
resolver = cls.resolver(cls.server.port),
sslports = ports
)
return d
def pathod(self, spec, sni=None):
"""
Constructs a pathod GET request, with the appropriate base and proxy.
"""
if self.ssl:
p = self.pathoc(sni=sni)
q = "get:'/p/%s'"%spec
else:
p = self.pathoc()
q = "get:'/p/%s'"%spec
return p.request(q)
def pathoc(self, sni=None):
"""
Returns a connected Pathoc instance.
"""
p = libpathod.pathoc.Pathoc(("localhost", self.proxy.port), ssl=self.ssl, sni=sni)
p.connect()
return p
class ReverseProxTest(ProxTestBase):
ssl = None
@classmethod
def get_proxy_config(cls):
d = ProxTestBase.get_proxy_config()
d["reverse_proxy"] = (
"https" if cls.ssl else "http",
"127.0.0.1",
cls.server.port
)
return d
def pathoc(self, sni=None):
"""
Returns a connected Pathoc instance.
"""
p = libpathod.pathoc.Pathoc(("localhost", self.proxy.port), ssl=self.ssl, sni=sni)
p.connect()
return p
def pathod(self, spec, sni=None):
"""
Constructs a pathod GET request, with the appropriate base and proxy.
"""
if self.ssl:
p = self.pathoc(sni=sni)
q = "get:'/p/%s'"%spec
else:
p = self.pathoc()
q = "get:'/p/%s'"%spec
return p.request(q)
class ChainProxTest(ProxTestBase):
"""
Chain n instances of mitmproxy in a row - because we can.
"""
n = 2
chain_config = [lambda: proxy.ProxyConfig(
cacert = tutils.test_data.path("data/confdir/mitmproxy-ca.pem"),
)] * n
@classmethod
def setupAll(cls):
super(ChainProxTest, cls).setupAll()
cls.chain = []
for i in range(cls.n):
config = cls.chain_config[i]()
config.forward_proxy = ("http", "127.0.0.1",
cls.proxy.port if i == 0 else
cls.chain[-1].port
)
tmaster = cls.masterclass(config)
cls.chain.append(ProxyThread(tmaster))
cls.chain[-1].start()
@classmethod
def teardownAll(cls):
super(ChainProxTest, cls).teardownAll()
for p in cls.chain:
p.tmaster.server.shutdown()
def setUp(self):
super(ChainProxTest, self).setUp()
for p in self.chain:
p.tmaster.clear_log()
p.tmaster.state.clear()
class HTTPChainProxyTest(ChainProxTest):
def pathoc(self, sni=None):
"""
Returns a connected Pathoc instance.
"""
p = libpathod.pathoc.Pathoc(("localhost", self.chain[-1].port), ssl=self.ssl, sni=sni)
if self.ssl:
p.connect(("127.0.0.1", self.server.port))
else:
p.connect()
return p