Add specification of replacement patterns on the command line.

This commit is contained in:
Aldo Cortesi 2012-03-17 17:20:34 +13:00
parent c8ae1e85b3
commit 76175672ad
7 changed files with 174 additions and 3 deletions

View File

@ -14,7 +14,61 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import proxy
import optparse
import optparse, re, filt, textwrap
class ParseReplaceException(Exception): pass
class OptionException(Exception): pass
def parse_replace_hook(s):
"""
Returns a (pattern, regex, replacement) tuple.
The general form for a replacement hook is as follows:
/patt/regex/replacement
The first character specifies the separator. Example:
:~q:foo:bar
If only two clauses are specified, the pattern is set to match
universally (i.e. ".*"). Example:
/foo/bar/
Clauses are parsed from left to right. Extra separators are taken to be
part of the final clause. For instance, the replacement clause below is
"foo/bar/":
/one/two/foo/bar/
Checks that pattern and regex are both well-formed. Raises
ParseReplaceException on error.
"""
sep, rem = s[0], s[1:]
parts = rem.split(sep, 2)
if len(parts) == 2:
patt = ".*"
regex, replacement = parts
elif len(parts) == 3:
patt, regex, replacement = parts
else:
raise ParseReplaceException("Malformed replacement specifier - too few clauses: %s"%s)
if not regex:
raise ParseReplaceException("Empty replacement regex: %s"%str(patt))
try:
re.compile(regex)
except re.error, e:
raise ParseReplaceException("Malformed replacement regex: %s"%str(e.message))
if not filt.parse(patt):
raise ParseReplaceException("Malformed replacement filter pattern: %s"%patt)
return patt, regex, replacement
def get_common_options(options):
@ -29,6 +83,24 @@ def get_common_options(options):
elif options.stickyauth_filt:
stickyauth = options.stickyauth_filt
reps = []
for i in options.replace:
try:
p = parse_replace_hook(i)
except ParseReplaceException, e:
raise OptionException(e.message)
reps.append(p)
for i in options.replace_file:
try:
patt, rex, path = parse_replace_hook(i)
except ParseReplaceException, e:
raise OptionException(e.message)
try:
v = open(path, "r").read()
except IOError, e:
raise OptionException("Could not read replace file: %s"%path)
reps.append((patt, rex, v))
return dict(
anticache = options.anticache,
anticomp = options.anticomp,
@ -39,6 +111,7 @@ def get_common_options(options):
refresh_server_playback = not options.norefresh,
rheaders = options.rheaders,
rfile = options.rfile,
replacements = reps,
server_replay = options.server_replay,
script = options.script,
stickycookie = stickycookie,
@ -190,6 +263,28 @@ def common_options(parser):
help="Disable response pop from response flow. "
"This makes it possible to replay same response multiple times."
)
group = optparse.OptionGroup(
parser,
"Replacements",
"""
Replacements are of the form "/pattern/regex/replacement", where
the separator can be any character. Please see the documentation
for more information.
""".strip()
)
group.add_option(
"--replace",
action="append", type="str", dest="replace", default=[],
metavar="PATTERN",
help="Replacement pattern."
)
group.add_option(
"--replace-from-file",
action="append", type="str", dest="replace_file", default=[],
metavar="PATTERN",
help="Replacement pattern, where the replacement clause is a path to a file."
)
parser.add_option_group(group)
proxy.certificate_option_group(parser)

View File

@ -294,6 +294,7 @@ class Options(object):
"refresh_server_playback",
"rfile",
"script",
"replacements",
"rheaders",
"server_replay",
"stickycookie",
@ -331,6 +332,9 @@ class ConsoleMaster(flow.FlowMaster):
self.looptime = 0
self.options = options
for i in options.replacements:
self.replacehooks.add(*i)
self.flow_list_view = None
self.set_palette()

View File

@ -30,6 +30,7 @@ class Options(object):
"no_server",
"nopop",
"refresh_server_playback",
"replacements",
"rfile",
"rheaders",
"server_replay",
@ -94,6 +95,9 @@ class DumpMaster(flow.FlowMaster):
except IOError, v:
raise DumpError(v.strerror)
for i in options.replacements:
self.replacehooks.add(*i)
if options.server_replay:
self.start_server_playback(
self._readflow(options.server_replay),

View File

@ -49,7 +49,10 @@ if __name__ == '__main__':
sys.exit(1)
dumpopts = dump.Options(**cmdline.get_common_options(options))
try:
dumpopts = dump.Options(**cmdline.get_common_options(options))
except cmdline.OptionException, v:
parser.error(v.message)
dumpopts.keepserving = options.keepserving
if args:

View File

@ -54,7 +54,10 @@ if __name__ == '__main__':
print >> sys.stderr, "mitmproxy:", v.args[0]
sys.exit(1)
opts = console.Options(**cmdline.get_common_options(options))
try:
opts = console.Options(**cmdline.get_common_options(options))
except cmdline.OptionException, v:
parser.error(v.message)
opts.intercept = options.intercept
opts.debug = options.debug
m = console.ConsoleMaster(server, opts)

1
test/data/replace Normal file
View File

@ -0,0 +1 @@
replacecontents

View File

@ -4,6 +4,37 @@ from libmproxy import cmdline
class uAll(libpry.AutoTree):
def test_parse_replace_hook(self):
x = cmdline.parse_replace_hook("/foo/bar/voing")
assert x == ("foo", "bar", "voing")
x = cmdline.parse_replace_hook("/foo/bar/vo/ing/")
assert x == ("foo", "bar", "vo/ing/")
x = cmdline.parse_replace_hook("/bar/voing")
assert x == (".*", "bar", "voing")
libpry.raises(
cmdline.ParseReplaceException,
cmdline.parse_replace_hook,
"/foo"
)
libpry.raises(
"replacement regex",
cmdline.parse_replace_hook,
"patt/[/rep"
)
libpry.raises(
"filter pattern",
cmdline.parse_replace_hook,
"/~/foo/rep"
)
libpry.raises(
"empty replacement regex",
cmdline.parse_replace_hook,
"//"
)
def test_common(self):
parser = optparse.OptionParser()
cmdline.common_options(parser)
@ -25,6 +56,36 @@ class uAll(libpry.AutoTree):
assert v["stickycookie"] == "foo"
assert v["stickyauth"] == "foo"
opts.replace = ["/foo/bar/voing"]
v = cmdline.get_common_options(opts)
assert v["replacements"] == [("foo", "bar", "voing")]
opts.replace = ["//"]
libpry.raises(
"empty replacement regex",
cmdline.get_common_options,
opts
)
opts.replace = []
opts.replace_file = [("/foo/bar/nonexistent")]
libpry.raises(
"could not read replace file",
cmdline.get_common_options,
opts
)
opts.replace_file = [("/~/bar/nonexistent")]
libpry.raises(
"filter pattern",
cmdline.get_common_options,
opts
)
opts.replace_file = [("/foo/bar/./data/replace")]
v = cmdline.get_common_options(opts)["replacements"]
assert len(v) == 1
assert v[0][2].strip() == "replacecontents"