multiple services/ipv6ranges, static typing, more improvements. (#21)

* Started smart ipv6 implementation with support for other services

* Fixed random_ipv6_address_mask type

* Added status check for icanhazip

* Added a script for converting poetry requirements to a requirements.txt file

* Added support for multiple services to be passed at once

* Added --no-services flag & set google to default

* Removed unused import

* Removed poetry

* Removed unused import

* Removed tinydb requirement

* Removed click requirement

* Remove duplicated ip ranges

* Backwards compatible

* Minor fix to import

* Add unixfox to the top of authors

* Bump version to 1.0.0

* Fixed running file without command

* delete build folder

* Added build folder

* Added clean-one & clean, fixed ipv6 check cleanup

* Updated docker image

* Added instructions in readme

* Added min python version

* Fixed clean missing decorator

* Removed unused import

* Added section already upgrading

* minor grammar fix

* Added legacy DB check, improved error message readability & removed useless warning
This commit is contained in:
Ward 2024-05-23 09:40:18 +12:00 committed by GitHub
parent 504beaf4ad
commit f9484f0412
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 630 additions and 328 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
config.py config.py
__pycache__ __pycache__
smart_ipv6_rotator.egg-info
build

View File

@ -2,11 +2,10 @@ FROM python:3-slim
WORKDIR /app/ WORKDIR /app/
COPY requirements.txt .
RUN --mount=type=cache,target=/root/.cache/pip \
pip install -r requirements.txt
COPY . . COPY . .
RUN --mount=type=cache,target=/root/.cache/pip \
pip install ./
ENTRYPOINT ["python", "/app/smart-ipv6-rotator.py"] ENTRYPOINT ["python", "/app/smart-ipv6-rotator.py"]

View File

@ -1,4 +1,11 @@
# Requirements # Smart IPv6 Rotator
Smart IPv6 Rotator is a command-line tool designed to rotate IPv6 addresses for specific subnets, enabling users to bypass restrictions on IPv6-enabled websites.
## Upgrading
If you are already running this script, please run `sudo python smart-ipv6-rotator.py clean` before upgrading it to avoid any issues.
## Requirements
- IPv6 on your server - IPv6 on your server
- Invidious works in IPv6 - Invidious works in IPv6
- Install these two python packages: - Install these two python packages:
@ -7,8 +14,7 @@
- Your provider need to allow you to assign any arbitrary IPv6 address, your IPv6 space must be fully routed. - Your provider need to allow you to assign any arbitrary IPv6 address, your IPv6 space must be fully routed.
Usually the case but some do not support it like the popular cloud providers: AWS, Google Cloud, Oracle Cloud, Azure and more. Usually the case but some do not support it like the popular cloud providers: AWS, Google Cloud, Oracle Cloud, Azure and more.
# How to setup (very simple tutorial) ## How to setup (very simple tutorial for Google)
Full detailed documentation: https://docs.invidious.io/ipv6-rotator/ Full detailed documentation: https://docs.invidious.io/ipv6-rotator/
1. Git clone the repository somewhere. 1. Git clone the repository somewhere.
@ -23,39 +29,103 @@ Full detailed documentation: https://docs.invidious.io/ipv6-rotator/
``` ```
The `sleep` command is used in case your network takes too much time time to be ready. The `sleep` command is used in case your network takes too much time time to be ready.
# Docker image ## Docker image
https://quay.io/repository/invidious/smart-ipv6-rotator https://quay.io/repository/invidious/smart-ipv6-rotator
# How to clean the configuration done by the script ## How to clean the configuration done by the script
``` ```
sudo python smart-ipv6-rotator.py clean sudo python smart-ipv6-rotator.py clean
``` ```
Only works if the script did not crash. But in case of a crash, in most case the system should auto rollback the changes. Only works if the script did not crash. But in case of a crash, in most case the system should auto rollback the changes.
# Why does this need root privileges? ## Usage
```plaintext
smart-ipv6-rotator.py [-h] {run,clean-one,clean} ...
```
### Options
- `-h, --help`: Display the help message and exit.
### Subcommands
1. `run`: Run the IPv6 rotator process.
2. `clean-one`: Clean your system for a given service / IPv6 ranges.
3. `clean`: Clean all configurations made by this script.
---
### `run` Subcommand
```plaintext
smart-ipv6-rotator.py run [-h] [--services {google}] [--external-ipv6-ranges EXTERNAL_IPV6_RANGES] [--skip-root] [--no-services] --ipv6range IPV6RANGE
```
#### Options
- `-h, --help`: Display the help message and exit.
- `--services {google}`: Define IPV6 ranges of popular services (e.g., --services google, twitter).
- `--external-ipv6-ranges EXTERNAL_IPV6_RANGES`: Manually define external IPV6 ranges to rotate for.
- `--skip-root`: Skip root check.
- `--no-services`: Completely disable the --services flag.
- `--ipv6range IPV6RANGE`: Your IPV6 range (e.g., 2407:7000:9827:4100::/64).
---
### `clean` Subcommand
```plaintext
smart-ipv6-rotator.py clean [-h] [--skip-root]
```
#### Options
- `-h, --help`: Display the help message and exit.
- `--skip-root`: Skip root check.
---
### `clean-one` Subcommand
```plaintext
smart-ipv6-rotator.py clean-one [-h] [--services {google}] [--external-ipv6-ranges EXTERNAL_IPV6_RANGES] [--skip-root] [--no-services]
```
#### Options
- `-h, --help`: Display the help message and exit.
- `--services {google}`: Define IPV6 ranges of popular services (e.g., --services google, twitter).
- `--external-ipv6-ranges EXTERNAL_IPV6_RANGES`: Manually define external IPV6 ranges to rotate for.
- `--skip-root`: Skip root check.
- `--no-services`: Completely disable the --services flag.
---
## Why does this need root privileges?
You can only modify the network configuration of your server using root privileges. You can only modify the network configuration of your server using root privileges.
The attack surface of this script is very limited as it is not running in the background, it's a one shot script. The attack surface of this script is very limited as it is not running in the background, it's a one shot script.
# How does this script work? ## How does this script work?
1. First it check that you have IPv6 connectivity. 1. First it check that you have IPv6 connectivity.
2. It automatically find the default IPv6 gateway and automatically generate a random IPv6 address from the IPv6 subnet that you configured. 2. It automatically find the default IPv6 gateway and automatically generate a random IPv6 address from the IPv6 subnet that you configured.
3. It adds the random IPv6 address to the network interface. 3. It adds the random IPv6 address to the network interface.
4. It configures route for only using that new random IPv6 address for the specific IPv6 subnets (Google ipv6 ranges by default). 4. It configures route for only using that new random IPv6 address for the specific IPv6 subnets (Google ipv6 ranges by default).
This way your current ipv6 network configuration is untouched and any change done by the script is temporary. This way your current ipv6 network configuration is untouched and any change done by the script is temporary.
# TODO (priority) ## TODO (priority)
## High ### High
- [x] Docker image for easier use. - [x] Docker image for easier use.
- [ ] Allow to configure your IPv6 subnets yourself. (Could be used for other projects) - [x] Allow to configure your IPv6 subnets yourself. (Could be used for other projects)
- [x] Better handle in case of errors in configuring IPv6 routes. Rollback the changes automatically - [x] Better handle in case of errors in configuring IPv6 routes. Rollback the changes automatically
- [ ] Allow to specify a specific network interface + ipv6 gateway instead of automatically discovering it. - [ ] Allow to specify a specific network interface + ipv6 gateway instead of automatically discovering it.
## Medium ### Medium
- [ ] Arg for spit out the IPv6 subnet of the current default ipv6 address instead of saying to use gestioip.net tool. - [ ] Arg for spit out the IPv6 subnet of the current default ipv6 address instead of saying to use gestioip.net tool.
- [ ] In most time, adding the new random IPv6 will take precedence over the existing IPv6. This may not be the expected behavior. - [ ] In most time, adding the new random IPv6 will take precedence over the existing IPv6. This may not be the expected behavior.
## Low ### Low
- [ ] Argument for testing if the setup will work without permanently do any modification. - [ ] Argument for testing if the setup will work without permanently do any modification.
- [ ] Allow to remove debug info - [ ] Allow to remove debug info
- [ ] Maybe not depend on icanhazip? Send requests in HTTPS? - [ ] Maybe not depend on icanhazip? Send requests in HTTPS?

16
pyproject.toml Normal file
View File

@ -0,0 +1,16 @@
[project]
name = "smart-ipv6-rotator"
version = "1.0.0"
requires-python = ">=3.9"
dependencies = [
"requests>=2.31.0",
"pyroute2>=0.7.12",
]
authors = [
{name = "unixfox"},
{name = "FireMasterK"},
{name = "TheFrenchGhosty"},
{name = "WardPearce"}
]
description = "IPv6 rotator for specific subnets - unblock restrictions on IPv6 enabled websites"
readme = "README.md"

View File

@ -1,2 +0,0 @@
pyroute2
requests

View File

@ -1,310 +1,4 @@
#!/usr/bin/env python from smart_ipv6_rotator import main
import sys
import os
import argparse
from random import seed, getrandbits, choice
from ipaddress import IPv6Network, IPv6Address
from time import sleep
from runpy import run_path
def module_not_found_helper(module_name):
sys.exit(
f"""[Error] Module '{module_name}' is not installed. please install it using your package manager.
Debian/Ubuntu: sudo apt install python3-{module_name}
RHEL/CentOS/Fedora: sudo dnf install python-{module_name}
Other Linux distributions (probably): sudo yourpackagemanager install python-{module_name}"""
)
try:
from pyroute2 import IPDB
from pyroute2 import IPRoute
except ModuleNotFoundError:
module_not_found_helper("pyroute2")
try:
import requests
except ModuleNotFoundError:
module_not_found_helper("requests")
ip = IPDB()
iproute = IPRoute()
class SmartIPv6Rotator(object):
def __init__(self):
parser = argparse.ArgumentParser(
description="IPv6 rotator",
usage="""smart-ipv6-rotator.py <command> [<args>]
The available args are:
clean Clean your system from the previous setup.
run Run the IPv6 rotator process.
""",
)
parser.add_argument("command", help="Subcommand to run")
args = parser.parse_args(sys.argv[1:2])
if not hasattr(self, args.command):
print("Unrecognized command")
parser.print_help()
exit(1)
# use dispatch pattern to invoke method with same name
getattr(self, args.command)()
# https://md5calc.com/google/ip
google_ipv6_ranges = [
"2001:4860:4000::/36",
"2404:6800:4000::/36",
"2607:f8b0:4000::/36",
"2800:3f0:4000::/36",
"2a00:1450:4000::/36",
"2c0f:fb50:4000::/36",
]
location_saved_config_ipv6_configured = "/tmp/smart-ipv6-rotator.py"
icanhazip_ipv6_address = "2606:4700::6812:7261"
def check_ipv6_connectivity(self):
try:
requests.get("http://ipv6.icanhazip.com", timeout=5)
except requests.exceptions.RequestException:
sys.exit(
"[Error] You do not have IPv6 connectivity. This script can not work."
)
print("[INFO] You have IPv6 connectivity. Continuing.")
def clean_previous_setup(self, existing_settings, args):
if os.geteuid() != 0 and not args.skip_root_check:
sys.exit(
"[Error] Please run this script as root! It needs root privileges."
)
if (
os.path.isfile(self.location_saved_config_ipv6_configured)
or len(existing_settings) > 0
):
settings = existing_settings
if len(existing_settings) == 0:
settings = run_path(self.location_saved_config_ipv6_configured)
try:
iproute.route(
"del",
dst=self.icanhazip_ipv6_address,
prefsrc=settings["random_ipv6_address"],
gateway=settings["gateway"],
oif=settings["interface_index"],
)
except:
print(
"[Error] Failed to remove the test IPv6 subnet.\n"
" May be expected if the route were not yet configured and that was a cleanup due to an error."
)
try:
for ipv6_range in self.google_ipv6_ranges:
iproute.route(
"del",
dst=ipv6_range,
prefsrc=settings["random_ipv6_address"],
gateway=settings["gateway"],
oif=settings["interface_index"],
)
except:
print(
"[Error] Failed to remove the configured (Google) IPv6 subnets.\n"
" May be expected if the route were not yet configured and that was a cleanup due to an error."
)
try:
iproute.addr(
"del",
settings["interface_index"],
address=settings["random_ipv6_address"],
mask=settings["random_ipv6_address_mask"],
)
except:
print(
"[Error] Failed to remove the random IPv6 address, very unexpected!"
)
if len(existing_settings) == 0:
os.remove(self.location_saved_config_ipv6_configured)
print(
"[INFO] Finished cleaning up previous setup.\n"
"[INFO] Waiting for the propagation in the Linux kernel."
)
sleep(6)
else:
print("[INFO] No cleanup of previous setup needed.")
def clean(self):
parser = argparse.ArgumentParser(description="Clean the previous setup.")
parser.add_argument(
"--skip-root",
required=False,
dest='skip_root_check',
action=argparse.BooleanOptionalAction,
help="Example: --skip-root for skipping root check",
)
args = parser.parse_args(sys.argv[2:])
self.clean_previous_setup({}, args)
def run(self):
parser = argparse.ArgumentParser(description="Run the IPv6 rotator.")
parser.add_argument(
"-r",
"--ipv6range",
required=True,
help="Example: --ipv6range=2001:1:1::/64",
)
parser.add_argument(
"--skip-root",
required=False,
dest='skip_root_check',
action=argparse.BooleanOptionalAction,
help="Example: --skip-root for skipping root check",
)
args = parser.parse_args(sys.argv[2:])
if os.geteuid() != 0 and not args.skip_root_check:
sys.exit(
"[Error] Please run this script as root! It needs root privileges."
)
self.check_ipv6_connectivity()
self.clean_previous_setup({}, args)
# calculate random IPv6 from the configured subnet
seed()
ipv6_network = IPv6Network(args.ipv6range)
random_ipv6_address = str(
IPv6Address(
ipv6_network.network_address
+ getrandbits(ipv6_network.max_prefixlen - ipv6_network.prefixlen)
)
)
# get default network interface for IPv6
default_interface = iproute.route("get", dst=choice(self.google_ipv6_ranges))[0]
default_interface_index = int(default_interface.get_attrs("RTA_OIF")[0])
default_interface_gateway = str(default_interface.get_attrs("RTA_GATEWAY")[0])
default_interface_name = ip.interfaces[default_interface_index]["ifname"]
memory_settings = {
"random_ipv6_address": random_ipv6_address,
"random_ipv6_address_mask": ipv6_network.prefixlen,
"gateway": default_interface_gateway,
"interface_index": default_interface_index,
"interface_name": default_interface_name,
"ipv6_subnet": args.ipv6range,
}
print("[DEBUG] Debug info:")
for k, v in memory_settings.items():
print(k, "-->", v)
try:
iproute.addr(
"add",
default_interface_index,
address=random_ipv6_address,
mask=ipv6_network.prefixlen,
)
except Exception as e:
self.clean_previous_setup(memory_settings, args)
sys.exit(
"[Error] Failed to add the new random IPv6 address. The setup did not work!\n"
" That's unexpected! Did you correctly configured the IPv6 subnet to use?\n"
f" Exception:\n{e}"
)
# needed so that the linux kernel takes into account the new ipv6 address
sleep(2)
# test that the new ipv6 route works
try:
iproute.route(
"add",
dst=self.icanhazip_ipv6_address,
prefsrc=random_ipv6_address,
gateway=default_interface_gateway,
oif=default_interface_index,
priority=1,
)
except Exception as e:
self.clean_previous_setup(memory_settings, args)
sys.exit(
"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
f" Exception:\n{e}"
)
# needed so that the linux kernel takes into account the new ipv6 route
sleep(2)
try:
check_new_ipv6_address = requests.get(
f"http://[{self.icanhazip_ipv6_address}]",
headers={"host": "ipv6.icanhazip.com"},
timeout=5,
)
response_new_ipv6_address = check_new_ipv6_address.text.strip()
if response_new_ipv6_address == random_ipv6_address:
print("[INFO] Correctly using the new random IPv6 address, continuing.")
else:
self.clean_previous_setup(memory_settings, args)
sys.exit(
"[ERROR] The new random IPv6 is not used! The setup did not work!\n"
" That is very unexpected, check if your IPv6 routes do not have too much priority."
f" Address used: {response_new_ipv6_address}"
)
except requests.exceptions.RequestException as e:
self.clean_previous_setup(memory_settings, args)
sys.exit(
"[ERROR] Failed to send the request for checking the new IPv6 address! The setup did not work!\n"
" Your provider probably does not allow setting any arbitrary IPv6 address.\n"
" Or did you correctly configured the IPv6 subnet to use?\n"
f" Exception:\n{e}"
)
# configure routes for ipv6 ranges of Google
try:
for ipv6_range in self.google_ipv6_ranges:
iproute.route(
"add",
dst=ipv6_range,
prefsrc=random_ipv6_address,
gateway=default_interface_gateway,
oif=default_interface_index,
priority=1,
)
except Exception as e:
self.clean_previous_setup(memory_settings, args)
sys.exit(
f"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
f" Exception:\n{e}"
)
print(
"[INFO] Correctly configured the IPv6 routes for Google IPv6 ranges.\n"
"[INFO] Successful setup. Waiting for the propagation in the Linux kernel."
)
sleep(6)
# saving configuration to a file for future cleanup
file = open(self.location_saved_config_ipv6_configured, "w")
file.write(
'random_ipv6_address="%s"\nrandom_ipv6_address_mask=%s\ngateway="%s"\ninterface_index=%s'
% (
random_ipv6_address,
ipv6_network.prefixlen,
default_interface_gateway,
default_interface_index,
)
)
file.close()
if __name__ == "__main__": if __name__ == "__main__":
SmartIPv6Rotator() main()

View File

@ -0,0 +1,282 @@
import argparse
import sys
from dataclasses import asdict
from ipaddress import IPv6Address, IPv6Network
from os import path
from random import choice, getrandbits, seed
from time import sleep
from typing import Any, Callable
import requests
from smart_ipv6_rotator.const import (
ICANHAZIP_IPV6_ADDRESS,
IP,
IPROUTE,
LEGACY_CONFIG_FILE,
)
from smart_ipv6_rotator.helpers import (
PreviousConfig,
SavedRanges,
check_ipv6_connectivity,
clean_ipv6_check,
clean_ranges,
previous_configs,
root_check,
what_ranges,
)
from smart_ipv6_rotator.ranges import RANGES
SHARED_OPTIONS = [
(
"--services",
{
"type": str,
"choices": list(RANGES.keys()),
"default": "google",
"help": "IPV6 ranges of popular services. Example: --services google,twitter",
},
),
(
"--external-ipv6-ranges",
{
"type": str,
"help": "Manually define external IPV6 ranges to rotate for.",
},
),
(
"--skip-root",
{
"action": "store_true",
"help": "Example: --skip-root for skipping root check",
},
),
(
"--no-services",
{
"action": "store_true",
"help": "Completely disables the --services flag.",
},
),
]
def parse_args(func) -> Callable[..., Any]:
def _parse_args(namespace: argparse.Namespace) -> Any:
params = dict(namespace.__dict__)
params.pop("subcommand")
params.pop("func")
return func(**params)
return _parse_args
@parse_args
def run(
ipv6range: str,
skip_root: bool = False,
services: str | None = None,
external_ipv6_ranges: str | None = None,
no_services: bool = False,
) -> None:
"""Run the IPv6 rotator process."""
if path.exists(LEGACY_CONFIG_FILE):
sys.exit(
"[ERROR] Legacy database format detected! Please run `python smart-ipv6-rotator.py clean` using the old version of this script.\nhttps://github.com/iv-org/smart-ipv6-rotator"
)
root_check(skip_root)
check_ipv6_connectivity()
service_ranges = what_ranges(services, external_ipv6_ranges, no_services)
clean_ranges(service_ranges, skip_root)
seed()
ipv6_network = IPv6Network(ipv6range)
random_ipv6_address = str(
IPv6Address(
ipv6_network.network_address
+ getrandbits(ipv6_network.max_prefixlen - ipv6_network.prefixlen)
)
)
default_interface = IPROUTE.route("get", dst=choice(service_ranges))[0] # type: ignore
default_interface_index = int(default_interface.get_attrs("RTA_OIF")[0])
default_interface_gateway = str(default_interface.get_attrs("RTA_GATEWAY")[0])
default_interface_name = IP.interfaces[default_interface_index]["ifname"]
saved_ranges = SavedRanges(
random_ipv6_address=random_ipv6_address,
random_ipv6_address_mask=ipv6_network.prefixlen,
gateway=default_interface_gateway,
interface_index=default_interface_index,
interface_name=default_interface_name,
ipv6_subnet=ipv6range,
ranges=service_ranges,
)
# Save config now, will be cleaned if errors raised.
PreviousConfig(service_ranges).save(saved_ranges)
print("[DEBUG] Debug info:")
for key, value in asdict(saved_ranges).items():
print(f"{key} --> {value}")
try:
IPROUTE.addr(
"add",
default_interface_index,
address=random_ipv6_address,
mask=ipv6_network.prefixlen,
)
except Exception as error:
clean_ranges(service_ranges, skip_root)
sys.exit(
"[Error] Failed to add the new random IPv6 address. The setup did not work!\n"
" That's unexpected! Did you correctly configure the IPv6 subnet to use?\n"
f" Exception:\n{error}"
)
sleep(2) # Need so that the linux kernel takes into account the new ipv6 route
try:
IPROUTE.route(
"add",
dst=ICANHAZIP_IPV6_ADDRESS,
prefsrc=random_ipv6_address,
gateway=default_interface_gateway,
oif=default_interface_index,
priority=1,
)
except Exception as error:
clean_ranges(service_ranges, skip_root)
sys.exit(
"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
f" Exception:\n{error}"
)
sleep(2)
try:
check_new_ipv6_address = requests.get(
f"http://[{ICANHAZIP_IPV6_ADDRESS}]",
headers={"host": "ipv6.icanhazip.com"},
timeout=5,
)
except requests.exceptions.RequestException as error:
clean_ranges(service_ranges, skip_root)
sys.exit(
"[ERROR] Failed to send the request for checking the new IPv6 address! The setup did not work!\n"
" Your provider probably does not allow setting any arbitrary IPv6 address.\n"
" Or did you correctly configure the IPv6 subnet to use?\n"
f" Exception:\n{error}"
)
try:
check_new_ipv6_address.raise_for_status()
except requests.HTTPError:
clean_ranges(service_ranges, skip_root)
sys.exit(
"[ERROR] icanhazip didn't return the expected status, possibly they are down right now."
)
response_new_ipv6_address = check_new_ipv6_address.text.strip()
if response_new_ipv6_address == random_ipv6_address:
print("[INFO] Correctly using the new random IPv6 address, continuing.")
else:
clean_ranges(service_ranges, skip_root)
sys.exit(
"[ERROR] The new random IPv6 is not used! The setup did not work!\n"
" That is very unexpected, check if your IPv6 routes do not have too much priority."
f" Address used: {response_new_ipv6_address}"
)
clean_ipv6_check(saved_ranges)
try:
for ipv6_range in service_ranges:
IPROUTE.route(
"add",
dst=ipv6_range,
prefsrc=random_ipv6_address,
gateway=default_interface_gateway,
oif=default_interface_index,
priority=1,
)
except Exception as error:
clean_ranges(service_ranges, skip_root)
sys.exit(
f"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
f" Exception:\n{error}"
)
print(
f"[INFO] Correctly configured the IPv6 routes for IPv6 ranges {service_ranges}.\n"
"[INFO] Successful setup. Waiting for the propagation in the Linux kernel."
)
sleep(6)
@parse_args
def clean_one(
skip_root: bool = False,
services: str | None = None,
external_ipv6_ranges: str | None = None,
no_services: bool = False,
) -> None:
"""Clean your system for a given service / ipv6 ranges."""
clean_ranges(what_ranges(services, external_ipv6_ranges, no_services), skip_root)
@parse_args
def clean(
skip_root: bool = False,
) -> None:
"""Clean all configurations made by this script."""
for config in previous_configs():
clean_ranges(config.ranges, skip_root)
def main() -> None:
"""IPv6 rotator for specific subnets - unblock restrictions on IPv6 enabled websites"""
parser = argparse.ArgumentParser(
description="IPv6 rotator for specific subnets - unblock restrictions on IPv6 enabled websites"
)
subparsers = parser.add_subparsers(title="subcommands", dest="subcommand")
run_parser = subparsers.add_parser("run", help="Run the IPv6 rotator process.")
for flag, config in SHARED_OPTIONS:
run_parser.add_argument(flag, **config)
run_parser.add_argument(
"--ipv6range",
help="Your IPV6 range. Example: 2407:7000:9827:4100::/64",
required=True,
)
run_parser.set_defaults(func=run)
clean_one_parser = subparsers.add_parser(
"clean-one", help="Clean your system for a given service / ipv6 ranges."
)
for flag, config in SHARED_OPTIONS:
clean_one_parser.add_argument(flag, **config)
clean_one_parser.set_defaults(func=clean_one)
clean_parser = subparsers.add_parser(
"clean", help="Clean all configurations made by this script."
)
clean_parser.add_argument("--skip-root", action="store_true")
clean_parser.set_defaults(func=clean)
# Check if a command is being ran, otherwise print help.
args = parser.parse_args()
if hasattr(args, "func"):
args.func(args)
else:
parser.print_help()

View File

@ -0,0 +1,12 @@
from pyroute2 import IPDB, IPRoute
ICANHAZIP_IPV6_ADDRESS = "2606:4700::6812:7261"
JSON_CONFIG_FILE = "/tmp/smart-ipv6-rotator.json"
LEGACY_CONFIG_FILE = "/tmp/smart-ipv6-rotator.py"
IP = IPDB()
IPROUTE = IPRoute()
__all__: list[str] = ["ICANHAZIP_IPV6_ADDRESS", "IP", "IPROUTE"]

View File

@ -0,0 +1,204 @@
import json
import os
import sys
from dataclasses import asdict
from time import sleep
from typing import Iterator
import requests
from smart_ipv6_rotator.const import ICANHAZIP_IPV6_ADDRESS, IPROUTE, JSON_CONFIG_FILE
from smart_ipv6_rotator.models import SavedRanges
from smart_ipv6_rotator.ranges import RANGES
def root_check(skip_root: bool = False) -> None:
if os.geteuid() != 0 and not skip_root:
sys.exit("[Error] Please run this script as root! It needs root privileges.")
def check_ipv6_connectivity() -> None:
try:
requests.get("http://ipv6.icanhazip.com", timeout=5)
except requests.Timeout:
sys.exit("[Error] You do not have IPv6 connectivity. This script can not work.")
except requests.HTTPError:
sys.exit(
"[ERROR] icanhazip didn't return the expected status, possibly they are down right now."
)
print("[INFO] You have IPv6 connectivity. Continuing.")
def what_ranges(
services: str | None = None,
ipv6_ranges: str | None = None,
no_services: bool = False,
) -> list[str]:
"""Works out what service ranges the user wants to use.
Args:
services (str | None, optional): Defaults to None.
ipv6_ranges (str | None, optional): Defaults to None.
no_services (bool, optional): Default to False
Returns:
list[str]: IPV6 ranges
"""
ranges_: list[str] = []
if services and not no_services:
for service in services.split(","):
if service not in RANGES:
sys.exit(f"{service} isn't a valid service.")
ranges_ += list(RANGES[service])
if ipv6_ranges:
ranges_ += ipv6_ranges.split(",")
if not ranges_:
sys.exit("No service or ranges given.")
return list(set(ranges_))
def clean_ipv6_check(config: SavedRanges) -> None:
try:
IPROUTE.route(
"del",
dst=ICANHAZIP_IPV6_ADDRESS,
prefsrc=config.random_ipv6_address,
gateway=config.gateway,
oif=config.interface_index,
)
except:
pass
def clean_ranges(ranges_: list[str], skip_root: bool) -> None:
"""Cleans root.
Args:
ranges_ (list[str]):
skip_root (bool):
"""
root_check(skip_root)
previous_config = PreviousConfig(ranges_)
previous = previous_config.get()
if not previous:
print("[INFO] No cleanup of previous setup needed.")
return
clean_ipv6_check(previous)
try:
for ipv6_range in previous.ranges:
IPROUTE.route(
"del",
dst=ipv6_range,
prefsrc=previous.random_ipv6_address,
gateway=previous.gateway,
oif=previous.interface_index,
)
except:
print(
f"""[Error] Failed to remove the configured IPv6 subnets {','.join(previous.ranges)}
May be expected if the route were not yet configured and that was a cleanup due to an error
"""
)
try:
IPROUTE.addr(
"del",
previous.interface_index,
address=previous.random_ipv6_address,
mask=previous.random_ipv6_address_mask,
)
except:
print("[Error] Failed to remove the random IPv6 address, very unexpected!")
previous_config.remove()
print(
"[INFO] Finished cleaning up previous setup.\n[INFO] Waiting for the propagation in the Linux kernel."
)
sleep(6)
def previous_configs() -> Iterator[SavedRanges]:
configs = PreviousConfig._get_raw()
for config in configs:
yield SavedRanges(**config)
class PreviousConfig:
def __init__(
self,
ranges_: list[str],
) -> None:
self.__ranges = ranges_
@classmethod
def _get_raw(cls) -> list[dict]:
if not os.path.exists(JSON_CONFIG_FILE):
return []
with open(JSON_CONFIG_FILE, "r") as f_:
return json.loads(f_.read())
def __ranges_exist(self, results: dict) -> bool:
return all(value in self.__ranges for value in results["ranges"])
def remove(self) -> None:
"""Remove range from json file."""
results = self._get_raw()
to_remove_index = next(
(
index
for index, ranges in enumerate(results)
if self.__ranges_exist(ranges)
),
None,
)
if to_remove_index is not None:
results.pop(to_remove_index)
with open(JSON_CONFIG_FILE, "w") as f_:
f_.write(json.dumps(results))
def save(self, to_save: SavedRanges) -> None:
"""Save a given service/ipv6 ranges for cleanup later.
Args:
ranges_ (list[str]): IPV6 ranges
"""
self.remove()
results = self._get_raw()
results.append(asdict(to_save))
with open(JSON_CONFIG_FILE, "w") as f_:
f_.write(json.dumps(results))
def get(self) -> SavedRanges | None:
"""Gets saved ranges.
Returns:
SavedRanges | None: Save ranges.
"""
results = self._get_raw()
for result in results:
if self.__ranges_exist(result):
return SavedRanges(**result)

View File

@ -0,0 +1,12 @@
from dataclasses import dataclass
@dataclass
class SavedRanges:
ranges: list[str]
random_ipv6_address: str
gateway: str
interface_index: int
interface_name: str
ipv6_subnet: str
random_ipv6_address_mask: int

View File

@ -0,0 +1,13 @@
RANGES: dict[str, list[str]] = {
"google": [
"2001:4860:4000::/36",
"2404:6800:4000::/36",
"2607:f8b0:4000::/36",
"2800:3f0:4000::/36",
"2a00:1450:4000::/36",
"2c0f:fb50:4000::/36",
]
}
__all__: list[str] = ["RANGES"]