Add block_list Option to set HTTP status for blocked flows/urls (#4516)

* Add block_list Option which returns a set HTTP status for a blocked flow filter:URL

* Add changelog entry

* Add blocklist to default AddOns

* Add Nginx 444 status code to list, so it is available for validation

* Add overview of blocklists

* Add allow-only, and more tests

* Fix mypy tox issue

* Finish test coverage with test for invalid filter

* Add PR feedback

* Fix type/scope error

* Fix stray blank line

* Delete concepts-blocklist.md

* Feature in overview

* Add flag to metadata for blocklisted flows.

* minor improvements

Co-authored-by: Maximilian Hils <git@maximilianhils.com>
This commit is contained in:
HereC 2021-03-29 11:30:21 -04:00 committed by GitHub
parent 2c941b8905
commit de3f089bb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 0 deletions

View File

@ -41,6 +41,7 @@ If you depend on these features, please raise your voice in
* The `clientconnect`, `clientdisconnect`, `serverconnect`, `serverdisconnect`, and `log`
events have been replaced with new events, see addon documentation for details (@mhils)
* Contentviews now implement `render_priority` instead of `should_render`, allowing more specialization (@mhils)
* Addition of block_list option to block requests with a set status code (@ericbeland)
* Automatic JSON view mode when `+json` suffix in content type (@kam800)
* Use pyca/cryptography to generate certificates, not pyOpenSSL (@mhils)
* Remove the legacy protocol stack (@Kriechi)

View File

@ -103,6 +103,10 @@ h1, h2, h3, h4, h5, h6 {
}
}
table code {
white-space: pre;
}
.footnotes p {
display: inline;
}

View File

@ -8,6 +8,7 @@ menu:
# Features
- [Anticache](#anticache)
- [Blocklist](#blocklist)
- [Client-side replay](#client-side-replay)
- [Map Local](#map-local)
- [Map Remote](#map-remote)
@ -28,6 +29,33 @@ server. This is useful when you want to make sure you capture an HTTP exchange
in its totality. It's also often used during client-side replay, when you want
to make sure the server responds with complete data.
## Blocklist
Using the `block_list` option, you can block particular websites or requests.
Mitmproxy returns a fixed HTTP status code instead, or no response at all.
`block_list` patterns look like this:
```
/flow-filter/status-code
```
* **flow-filter** is an optional mitmproxy [filter expression]({{< relref "concepts-filters">}})
that describes which requests should be blocked.
* **status-code** is the [HTTP status code](https://en.wikipedia.org/wiki/List_of_HTTP_status_codes)
served by mitmproxy for blocked requests.
A special status code of 444 instructs mitmproxy to "hang up" and not send any response at all.
The _separator_ is arbitrary, and is defined by the first character.
#### Examples
Pattern | Description
------- | -----------
`:~d google-analytics.com:404` | Block all requests to google-analytics.com, and return a "404 Not Found" instead.
`:~d example.com$:444` | Block all requests to example.com, and do not send an HTTP response.
`:!~d ^example\.com$:403` | Only allow HTTP requests to *example.com*. Note that this is not secure against an active adversary and can be bypassed, for example by switching to non-HTTP protocols.
## Client-side replay
Client-side replay does what it says on the tin: you provide a previously saved

View File

@ -1,6 +1,7 @@
from mitmproxy.addons import anticache
from mitmproxy.addons import anticomp
from mitmproxy.addons import block
from mitmproxy.addons import blocklist
from mitmproxy.addons import browser
from mitmproxy.addons import clientplayback
from mitmproxy.addons import command_history
@ -31,6 +32,7 @@ def default_addons():
core.Core(),
browser.Browser(),
block.Block(),
blocklist.BlockList(),
anticache.AntiCache(),
anticomp.AntiComp(),
clientplayback.ClientPlayback(),

View File

@ -0,0 +1,78 @@
import typing
from mitmproxy import ctx, exceptions, flowfilter, http, version
from mitmproxy.net.http.status_codes import NO_RESPONSE
from mitmproxy.net.http.status_codes import RESPONSES
class BlockSpec(typing.NamedTuple):
matches: flowfilter.TFilter
status_code: int
def parse_spec(option: str) -> BlockSpec:
"""
Parses strings in the following format, enforces number of segments:
/flow-filter/status
"""
sep, rem = option[0], option[1:]
parts = rem.lower().split(sep, 2)
if len(parts) != 2:
raise ValueError("Invalid number of parameters (2 are expected)")
flow_patt, status = parts
try:
status_code = int(status)
except ValueError:
raise ValueError(f"Invalid HTTP status code: {status}")
flow_filter = flowfilter.parse(flow_patt)
if not flow_filter:
raise ValueError(f"Invalid filter pattern: {flow_patt}")
if not RESPONSES.get(status_code):
raise ValueError(f"Invalid HTTP status code: {status}")
return BlockSpec(matches=flow_filter, status_code=status_code)
class BlockList:
def __init__(self):
self.items: typing.List[BlockSpec] = []
def load(self, loader):
loader.add_option(
"block_list", typing.Sequence[str], [],
"""
Block matching requests and return an empty response with the specified HTTP status.
Option syntax is "/flow-filter/status-code", where flow-filter describes
which requests this rule should be applied to and status-code is the HTTP status code to return for
blocked requests. The separator ("/" in the example) can be any character.
Setting a non-standard status code of 444 will close the connection without sending a response.
"""
)
def configure(self, updated):
if "block_list" in updated:
self.items = []
for option in ctx.options.block_list:
try:
spec = parse_spec(option)
except ValueError as e:
raise exceptions.OptionsError(f"Cannot parse block_list option {option}: {e}") from e
self.items.append(spec)
def request(self, flow: http.HTTPFlow) -> None:
if flow.response or flow.error or (flow.reply and flow.reply.state == "taken"):
return
for spec in self.items:
if spec.matches(flow):
flow.metadata['blocklisted'] = True
if spec.status_code == NO_RESPONSE:
flow.kill()
else:
flow.response = http.Response.make(
spec.status_code,
headers={"Server": version.MITMPROXY}
)

View File

@ -93,8 +93,10 @@ RESPONSES = {
REQUESTED_RANGE_NOT_SATISFIABLE: "Requested Range not satisfiable",
EXPECTATION_FAILED: "Expectation Failed",
IM_A_TEAPOT: "I'm a teapot",
NO_RESPONSE: "No Response",
CLIENT_CLOSED_REQUEST: "Client Closed Request",
# 500
INTERNAL_SERVER_ERROR: "Internal Server Error",
NOT_IMPLEMENTED: "Not Implemented",

View File

@ -0,0 +1,65 @@
import pytest
from mitmproxy.addons import blocklist
from mitmproxy.exceptions import OptionsError
from mitmproxy.test import taddons
from mitmproxy.test import tflow
@pytest.mark.parametrize("filter,err", [
("/~u index.html/TOOMANY/300", "Invalid number of parameters"),
(":~d ~d ~d:200", "Invalid filter"),
("/~u index.html/999", "Invalid HTTP status code"),
("/~u index.html/abc", "Invalid HTTP status code"),
])
def test_parse_spec_err(filter, err):
with pytest.raises(ValueError, match=err):
blocklist.parse_spec(filter)
class TestBlockList:
@pytest.mark.parametrize("filter,status_code", [
(":~u example.org:404", 404),
(":~u example.com:404", None),
("/!jpg/418", None),
("/!png/418", 418),
])
def test_block(self, filter, status_code):
bl = blocklist.BlockList()
with taddons.context(bl) as tctx:
tctx.configure(bl, block_list=[filter])
f = tflow.tflow()
f.request.url = b"https://example.org/images/test.jpg"
bl.request(f)
if status_code is not None:
assert f.response.status_code == status_code
assert f.metadata['blocklisted']
else:
assert not f.response
def test_special_kill_status_closes_connection(self):
bl = blocklist.BlockList()
with taddons.context(bl) as tctx:
tctx.configure(bl, block_list=[':.*:444'])
f = tflow.tflow()
bl.request(f)
assert f.error.msg == f.error.KILLED_MESSAGE
assert f.response is None
assert f.metadata['blocklisted'] is True
def test_already_handled(self):
"""Test that we don't interfere if another addon already killed this request."""
bl = blocklist.BlockList()
with taddons.context(bl) as tctx:
tctx.configure(bl, block_list=["/.*/404"])
f = tflow.tflow()
f.kill() # done by another addon.
bl.request(f)
assert not f.response
def test_configure_err(self):
bl = blocklist.BlockList()
with taddons.context(bl) as tctx:
with pytest.raises(OptionsError):
tctx.configure(bl, block_list=["lalelu"])