diff --git a/.gitignore b/.gitignore index 3570422..329c2ec 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ config.py __pycache__ +smart_ipv6_rotator.egg-info +build \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 50a291f..de9abdd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,11 +2,10 @@ FROM python:3-slim WORKDIR /app/ -COPY requirements.txt . - -RUN --mount=type=cache,target=/root/.cache/pip \ - pip install -r requirements.txt - COPY . . +RUN --mount=type=cache,target=/root/.cache/pip \ + pip install ./ + + ENTRYPOINT ["python", "/app/smart-ipv6-rotator.py"] diff --git a/README.md b/README.md index 10053db..e025dce 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,11 @@ -# Requirements +# Smart IPv6 Rotator + +Smart IPv6 Rotator is a command-line tool designed to rotate IPv6 addresses for specific subnets, enabling users to bypass restrictions on IPv6-enabled websites. + +## Upgrading +If you are already running this script, please run `sudo python smart-ipv6-rotator.py clean` before upgrading it to avoid any issues. + +## Requirements - IPv6 on your server - Invidious works in IPv6 - Install these two python packages: @@ -7,8 +14,7 @@ - Your provider need to allow you to assign any arbitrary IPv6 address, your IPv6 space must be fully routed. Usually the case but some do not support it like the popular cloud providers: AWS, Google Cloud, Oracle Cloud, Azure and more. -# How to setup (very simple tutorial) - +## How to setup (very simple tutorial for Google) Full detailed documentation: https://docs.invidious.io/ipv6-rotator/ 1. Git clone the repository somewhere. @@ -23,39 +29,103 @@ Full detailed documentation: https://docs.invidious.io/ipv6-rotator/ ``` The `sleep` command is used in case your network takes too much time time to be ready. -# Docker image - +## Docker image https://quay.io/repository/invidious/smart-ipv6-rotator -# How to clean the configuration done by the script +## How to clean the configuration done by the script ``` sudo python smart-ipv6-rotator.py clean ``` Only works if the script did not crash. But in case of a crash, in most case the system should auto rollback the changes. -# Why does this need root privileges? +## Usage + +```plaintext +smart-ipv6-rotator.py [-h] {run,clean-one,clean} ... +``` + +### Options + +- `-h, --help`: Display the help message and exit. + +### Subcommands + +1. `run`: Run the IPv6 rotator process. +2. `clean-one`: Clean your system for a given service / IPv6 ranges. +3. `clean`: Clean all configurations made by this script. + +--- + +### `run` Subcommand + +```plaintext +smart-ipv6-rotator.py run [-h] [--services {google}] [--external-ipv6-ranges EXTERNAL_IPV6_RANGES] [--skip-root] [--no-services] --ipv6range IPV6RANGE +``` + +#### Options + +- `-h, --help`: Display the help message and exit. +- `--services {google}`: Define IPV6 ranges of popular services (e.g., --services google, twitter). +- `--external-ipv6-ranges EXTERNAL_IPV6_RANGES`: Manually define external IPV6 ranges to rotate for. +- `--skip-root`: Skip root check. +- `--no-services`: Completely disable the --services flag. +- `--ipv6range IPV6RANGE`: Your IPV6 range (e.g., 2407:7000:9827:4100::/64). + +--- + +### `clean` Subcommand + +```plaintext +smart-ipv6-rotator.py clean [-h] [--skip-root] +``` + +#### Options + +- `-h, --help`: Display the help message and exit. +- `--skip-root`: Skip root check. + +--- + +### `clean-one` Subcommand + +```plaintext +smart-ipv6-rotator.py clean-one [-h] [--services {google}] [--external-ipv6-ranges EXTERNAL_IPV6_RANGES] [--skip-root] [--no-services] +``` + +#### Options + +- `-h, --help`: Display the help message and exit. +- `--services {google}`: Define IPV6 ranges of popular services (e.g., --services google, twitter). +- `--external-ipv6-ranges EXTERNAL_IPV6_RANGES`: Manually define external IPV6 ranges to rotate for. +- `--skip-root`: Skip root check. +- `--no-services`: Completely disable the --services flag. + +--- + + +## Why does this need root privileges? You can only modify the network configuration of your server using root privileges. The attack surface of this script is very limited as it is not running in the background, it's a one shot script. -# How does this script work? +## How does this script work? 1. First it check that you have IPv6 connectivity. 2. It automatically find the default IPv6 gateway and automatically generate a random IPv6 address from the IPv6 subnet that you configured. 3. It adds the random IPv6 address to the network interface. 4. It configures route for only using that new random IPv6 address for the specific IPv6 subnets (Google ipv6 ranges by default). This way your current ipv6 network configuration is untouched and any change done by the script is temporary. -# TODO (priority) -## High +## TODO (priority) +### High - [x] Docker image for easier use. -- [ ] Allow to configure your IPv6 subnets yourself. (Could be used for other projects) +- [x] Allow to configure your IPv6 subnets yourself. (Could be used for other projects) - [x] Better handle in case of errors in configuring IPv6 routes. Rollback the changes automatically - [ ] Allow to specify a specific network interface + ipv6 gateway instead of automatically discovering it. -## Medium +### Medium - [ ] Arg for spit out the IPv6 subnet of the current default ipv6 address instead of saying to use gestioip.net tool. - [ ] In most time, adding the new random IPv6 will take precedence over the existing IPv6. This may not be the expected behavior. -## Low +### Low - [ ] Argument for testing if the setup will work without permanently do any modification. - [ ] Allow to remove debug info - [ ] Maybe not depend on icanhazip? Send requests in HTTPS? diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..61eebd9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "smart-ipv6-rotator" +version = "1.0.0" +requires-python = ">=3.9" +dependencies = [ + "requests>=2.31.0", + "pyroute2>=0.7.12", +] +authors = [ + {name = "unixfox"}, + {name = "FireMasterK"}, + {name = "TheFrenchGhosty"}, + {name = "WardPearce"} +] +description = "IPv6 rotator for specific subnets - unblock restrictions on IPv6 enabled websites" +readme = "README.md" diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 4e4cb6c..0000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -pyroute2 -requests diff --git a/smart-ipv6-rotator.py b/smart-ipv6-rotator.py index 1059196..20e4569 100644 --- a/smart-ipv6-rotator.py +++ b/smart-ipv6-rotator.py @@ -1,310 +1,4 @@ -#!/usr/bin/env python - -import sys -import os -import argparse -from random import seed, getrandbits, choice -from ipaddress import IPv6Network, IPv6Address -from time import sleep -from runpy import run_path - - -def module_not_found_helper(module_name): - sys.exit( - f"""[Error] Module '{module_name}' is not installed. please install it using your package manager. - Debian/Ubuntu: sudo apt install python3-{module_name} - RHEL/CentOS/Fedora: sudo dnf install python-{module_name} - Other Linux distributions (probably): sudo yourpackagemanager install python-{module_name}""" - ) - - -try: - from pyroute2 import IPDB - from pyroute2 import IPRoute -except ModuleNotFoundError: - module_not_found_helper("pyroute2") -try: - import requests -except ModuleNotFoundError: - module_not_found_helper("requests") - -ip = IPDB() -iproute = IPRoute() - - -class SmartIPv6Rotator(object): - def __init__(self): - parser = argparse.ArgumentParser( - description="IPv6 rotator", - usage="""smart-ipv6-rotator.py [] - -The available args are: - clean Clean your system from the previous setup. - run Run the IPv6 rotator process. -""", - ) - parser.add_argument("command", help="Subcommand to run") - args = parser.parse_args(sys.argv[1:2]) - if not hasattr(self, args.command): - print("Unrecognized command") - parser.print_help() - exit(1) - # use dispatch pattern to invoke method with same name - getattr(self, args.command)() - - # https://md5calc.com/google/ip - google_ipv6_ranges = [ - "2001:4860:4000::/36", - "2404:6800:4000::/36", - "2607:f8b0:4000::/36", - "2800:3f0:4000::/36", - "2a00:1450:4000::/36", - "2c0f:fb50:4000::/36", - ] - location_saved_config_ipv6_configured = "/tmp/smart-ipv6-rotator.py" - icanhazip_ipv6_address = "2606:4700::6812:7261" - - def check_ipv6_connectivity(self): - try: - requests.get("http://ipv6.icanhazip.com", timeout=5) - except requests.exceptions.RequestException: - sys.exit( - "[Error] You do not have IPv6 connectivity. This script can not work." - ) - - print("[INFO] You have IPv6 connectivity. Continuing.") - - def clean_previous_setup(self, existing_settings, args): - - if os.geteuid() != 0 and not args.skip_root_check: - sys.exit( - "[Error] Please run this script as root! It needs root privileges." - ) - - if ( - os.path.isfile(self.location_saved_config_ipv6_configured) - or len(existing_settings) > 0 - ): - settings = existing_settings - if len(existing_settings) == 0: - settings = run_path(self.location_saved_config_ipv6_configured) - try: - iproute.route( - "del", - dst=self.icanhazip_ipv6_address, - prefsrc=settings["random_ipv6_address"], - gateway=settings["gateway"], - oif=settings["interface_index"], - ) - except: - print( - "[Error] Failed to remove the test IPv6 subnet.\n" - " May be expected if the route were not yet configured and that was a cleanup due to an error." - ) - - try: - for ipv6_range in self.google_ipv6_ranges: - iproute.route( - "del", - dst=ipv6_range, - prefsrc=settings["random_ipv6_address"], - gateway=settings["gateway"], - oif=settings["interface_index"], - ) - except: - print( - "[Error] Failed to remove the configured (Google) IPv6 subnets.\n" - " May be expected if the route were not yet configured and that was a cleanup due to an error." - ) - - try: - iproute.addr( - "del", - settings["interface_index"], - address=settings["random_ipv6_address"], - mask=settings["random_ipv6_address_mask"], - ) - except: - print( - "[Error] Failed to remove the random IPv6 address, very unexpected!" - ) - - if len(existing_settings) == 0: - os.remove(self.location_saved_config_ipv6_configured) - - print( - "[INFO] Finished cleaning up previous setup.\n" - "[INFO] Waiting for the propagation in the Linux kernel." - ) - sleep(6) - else: - print("[INFO] No cleanup of previous setup needed.") - - def clean(self): - parser = argparse.ArgumentParser(description="Clean the previous setup.") - parser.add_argument( - "--skip-root", - required=False, - dest='skip_root_check', - action=argparse.BooleanOptionalAction, - help="Example: --skip-root for skipping root check", - ) - args = parser.parse_args(sys.argv[2:]) - self.clean_previous_setup({}, args) - - def run(self): - parser = argparse.ArgumentParser(description="Run the IPv6 rotator.") - parser.add_argument( - "-r", - "--ipv6range", - required=True, - help="Example: --ipv6range=2001:1:1::/64", - ) - parser.add_argument( - "--skip-root", - required=False, - dest='skip_root_check', - action=argparse.BooleanOptionalAction, - help="Example: --skip-root for skipping root check", - ) - args = parser.parse_args(sys.argv[2:]) - - if os.geteuid() != 0 and not args.skip_root_check: - sys.exit( - "[Error] Please run this script as root! It needs root privileges." - ) - - self.check_ipv6_connectivity() - self.clean_previous_setup({}, args) - - # calculate random IPv6 from the configured subnet - - seed() - ipv6_network = IPv6Network(args.ipv6range) - random_ipv6_address = str( - IPv6Address( - ipv6_network.network_address - + getrandbits(ipv6_network.max_prefixlen - ipv6_network.prefixlen) - ) - ) - - # get default network interface for IPv6 - - default_interface = iproute.route("get", dst=choice(self.google_ipv6_ranges))[0] - default_interface_index = int(default_interface.get_attrs("RTA_OIF")[0]) - default_interface_gateway = str(default_interface.get_attrs("RTA_GATEWAY")[0]) - default_interface_name = ip.interfaces[default_interface_index]["ifname"] - - memory_settings = { - "random_ipv6_address": random_ipv6_address, - "random_ipv6_address_mask": ipv6_network.prefixlen, - "gateway": default_interface_gateway, - "interface_index": default_interface_index, - "interface_name": default_interface_name, - "ipv6_subnet": args.ipv6range, - } - - print("[DEBUG] Debug info:") - for k, v in memory_settings.items(): - print(k, "-->", v) - - try: - iproute.addr( - "add", - default_interface_index, - address=random_ipv6_address, - mask=ipv6_network.prefixlen, - ) - except Exception as e: - self.clean_previous_setup(memory_settings, args) - sys.exit( - "[Error] Failed to add the new random IPv6 address. The setup did not work!\n" - " That's unexpected! Did you correctly configured the IPv6 subnet to use?\n" - f" Exception:\n{e}" - ) - # needed so that the linux kernel takes into account the new ipv6 address - sleep(2) - - # test that the new ipv6 route works - try: - iproute.route( - "add", - dst=self.icanhazip_ipv6_address, - prefsrc=random_ipv6_address, - gateway=default_interface_gateway, - oif=default_interface_index, - priority=1, - ) - except Exception as e: - self.clean_previous_setup(memory_settings, args) - sys.exit( - "[Error] Failed to configure the test IPv6 route. The setup did not work!\n" - f" Exception:\n{e}" - ) - # needed so that the linux kernel takes into account the new ipv6 route - sleep(2) - try: - check_new_ipv6_address = requests.get( - f"http://[{self.icanhazip_ipv6_address}]", - headers={"host": "ipv6.icanhazip.com"}, - timeout=5, - ) - response_new_ipv6_address = check_new_ipv6_address.text.strip() - if response_new_ipv6_address == random_ipv6_address: - print("[INFO] Correctly using the new random IPv6 address, continuing.") - else: - self.clean_previous_setup(memory_settings, args) - sys.exit( - "[ERROR] The new random IPv6 is not used! The setup did not work!\n" - " That is very unexpected, check if your IPv6 routes do not have too much priority." - f" Address used: {response_new_ipv6_address}" - ) - except requests.exceptions.RequestException as e: - self.clean_previous_setup(memory_settings, args) - sys.exit( - "[ERROR] Failed to send the request for checking the new IPv6 address! The setup did not work!\n" - " Your provider probably does not allow setting any arbitrary IPv6 address.\n" - " Or did you correctly configured the IPv6 subnet to use?\n" - f" Exception:\n{e}" - ) - - # configure routes for ipv6 ranges of Google - try: - for ipv6_range in self.google_ipv6_ranges: - iproute.route( - "add", - dst=ipv6_range, - prefsrc=random_ipv6_address, - gateway=default_interface_gateway, - oif=default_interface_index, - priority=1, - ) - except Exception as e: - self.clean_previous_setup(memory_settings, args) - sys.exit( - f"[Error] Failed to configure the test IPv6 route. The setup did not work!\n" - f" Exception:\n{e}" - ) - - print( - "[INFO] Correctly configured the IPv6 routes for Google IPv6 ranges.\n" - "[INFO] Successful setup. Waiting for the propagation in the Linux kernel." - ) - sleep(6) - - # saving configuration to a file for future cleanup - file = open(self.location_saved_config_ipv6_configured, "w") - file.write( - 'random_ipv6_address="%s"\nrandom_ipv6_address_mask=%s\ngateway="%s"\ninterface_index=%s' - % ( - random_ipv6_address, - ipv6_network.prefixlen, - default_interface_gateway, - default_interface_index, - ) - ) - file.close() - +from smart_ipv6_rotator import main if __name__ == "__main__": - SmartIPv6Rotator() + main() diff --git a/smart_ipv6_rotator/__init__.py b/smart_ipv6_rotator/__init__.py new file mode 100644 index 0000000..87f718c --- /dev/null +++ b/smart_ipv6_rotator/__init__.py @@ -0,0 +1,282 @@ +import argparse +import sys +from dataclasses import asdict +from ipaddress import IPv6Address, IPv6Network +from os import path +from random import choice, getrandbits, seed +from time import sleep +from typing import Any, Callable + +import requests + +from smart_ipv6_rotator.const import ( + ICANHAZIP_IPV6_ADDRESS, + IP, + IPROUTE, + LEGACY_CONFIG_FILE, +) +from smart_ipv6_rotator.helpers import ( + PreviousConfig, + SavedRanges, + check_ipv6_connectivity, + clean_ipv6_check, + clean_ranges, + previous_configs, + root_check, + what_ranges, +) +from smart_ipv6_rotator.ranges import RANGES + +SHARED_OPTIONS = [ + ( + "--services", + { + "type": str, + "choices": list(RANGES.keys()), + "default": "google", + "help": "IPV6 ranges of popular services. Example: --services google,twitter", + }, + ), + ( + "--external-ipv6-ranges", + { + "type": str, + "help": "Manually define external IPV6 ranges to rotate for.", + }, + ), + ( + "--skip-root", + { + "action": "store_true", + "help": "Example: --skip-root for skipping root check", + }, + ), + ( + "--no-services", + { + "action": "store_true", + "help": "Completely disables the --services flag.", + }, + ), +] + + +def parse_args(func) -> Callable[..., Any]: + def _parse_args(namespace: argparse.Namespace) -> Any: + params = dict(namespace.__dict__) + params.pop("subcommand") + params.pop("func") + + return func(**params) + + return _parse_args + + +@parse_args +def run( + ipv6range: str, + skip_root: bool = False, + services: str | None = None, + external_ipv6_ranges: str | None = None, + no_services: bool = False, +) -> None: + """Run the IPv6 rotator process.""" + + if path.exists(LEGACY_CONFIG_FILE): + sys.exit( + "[ERROR] Legacy database format detected! Please run `python smart-ipv6-rotator.py clean` using the old version of this script.\nhttps://github.com/iv-org/smart-ipv6-rotator" + ) + + root_check(skip_root) + check_ipv6_connectivity() + + service_ranges = what_ranges(services, external_ipv6_ranges, no_services) + + clean_ranges(service_ranges, skip_root) + + seed() + ipv6_network = IPv6Network(ipv6range) + random_ipv6_address = str( + IPv6Address( + ipv6_network.network_address + + getrandbits(ipv6_network.max_prefixlen - ipv6_network.prefixlen) + ) + ) + + default_interface = IPROUTE.route("get", dst=choice(service_ranges))[0] # type: ignore + default_interface_index = int(default_interface.get_attrs("RTA_OIF")[0]) + default_interface_gateway = str(default_interface.get_attrs("RTA_GATEWAY")[0]) + default_interface_name = IP.interfaces[default_interface_index]["ifname"] + + saved_ranges = SavedRanges( + random_ipv6_address=random_ipv6_address, + random_ipv6_address_mask=ipv6_network.prefixlen, + gateway=default_interface_gateway, + interface_index=default_interface_index, + interface_name=default_interface_name, + ipv6_subnet=ipv6range, + ranges=service_ranges, + ) + + # Save config now, will be cleaned if errors raised. + PreviousConfig(service_ranges).save(saved_ranges) + + print("[DEBUG] Debug info:") + for key, value in asdict(saved_ranges).items(): + print(f"{key} --> {value}") + + try: + IPROUTE.addr( + "add", + default_interface_index, + address=random_ipv6_address, + mask=ipv6_network.prefixlen, + ) + except Exception as error: + clean_ranges(service_ranges, skip_root) + sys.exit( + "[Error] Failed to add the new random IPv6 address. The setup did not work!\n" + " That's unexpected! Did you correctly configure the IPv6 subnet to use?\n" + f" Exception:\n{error}" + ) + + sleep(2) # Need so that the linux kernel takes into account the new ipv6 route + + try: + IPROUTE.route( + "add", + dst=ICANHAZIP_IPV6_ADDRESS, + prefsrc=random_ipv6_address, + gateway=default_interface_gateway, + oif=default_interface_index, + priority=1, + ) + except Exception as error: + clean_ranges(service_ranges, skip_root) + sys.exit( + "[Error] Failed to configure the test IPv6 route. The setup did not work!\n" + f" Exception:\n{error}" + ) + + sleep(2) + + try: + check_new_ipv6_address = requests.get( + f"http://[{ICANHAZIP_IPV6_ADDRESS}]", + headers={"host": "ipv6.icanhazip.com"}, + timeout=5, + ) + except requests.exceptions.RequestException as error: + clean_ranges(service_ranges, skip_root) + sys.exit( + "[ERROR] Failed to send the request for checking the new IPv6 address! The setup did not work!\n" + " Your provider probably does not allow setting any arbitrary IPv6 address.\n" + " Or did you correctly configure the IPv6 subnet to use?\n" + f" Exception:\n{error}" + ) + + try: + check_new_ipv6_address.raise_for_status() + except requests.HTTPError: + clean_ranges(service_ranges, skip_root) + sys.exit( + "[ERROR] icanhazip didn't return the expected status, possibly they are down right now." + ) + + response_new_ipv6_address = check_new_ipv6_address.text.strip() + if response_new_ipv6_address == random_ipv6_address: + print("[INFO] Correctly using the new random IPv6 address, continuing.") + else: + clean_ranges(service_ranges, skip_root) + sys.exit( + "[ERROR] The new random IPv6 is not used! The setup did not work!\n" + " That is very unexpected, check if your IPv6 routes do not have too much priority." + f" Address used: {response_new_ipv6_address}" + ) + + clean_ipv6_check(saved_ranges) + + try: + for ipv6_range in service_ranges: + IPROUTE.route( + "add", + dst=ipv6_range, + prefsrc=random_ipv6_address, + gateway=default_interface_gateway, + oif=default_interface_index, + priority=1, + ) + except Exception as error: + clean_ranges(service_ranges, skip_root) + sys.exit( + f"[Error] Failed to configure the test IPv6 route. The setup did not work!\n" + f" Exception:\n{error}" + ) + + print( + f"[INFO] Correctly configured the IPv6 routes for IPv6 ranges {service_ranges}.\n" + "[INFO] Successful setup. Waiting for the propagation in the Linux kernel." + ) + sleep(6) + + +@parse_args +def clean_one( + skip_root: bool = False, + services: str | None = None, + external_ipv6_ranges: str | None = None, + no_services: bool = False, +) -> None: + """Clean your system for a given service / ipv6 ranges.""" + + clean_ranges(what_ranges(services, external_ipv6_ranges, no_services), skip_root) + + +@parse_args +def clean( + skip_root: bool = False, +) -> None: + """Clean all configurations made by this script.""" + + for config in previous_configs(): + clean_ranges(config.ranges, skip_root) + + +def main() -> None: + """IPv6 rotator for specific subnets - unblock restrictions on IPv6 enabled websites""" + parser = argparse.ArgumentParser( + description="IPv6 rotator for specific subnets - unblock restrictions on IPv6 enabled websites" + ) + subparsers = parser.add_subparsers(title="subcommands", dest="subcommand") + + run_parser = subparsers.add_parser("run", help="Run the IPv6 rotator process.") + for flag, config in SHARED_OPTIONS: + run_parser.add_argument(flag, **config) + + run_parser.add_argument( + "--ipv6range", + help="Your IPV6 range. Example: 2407:7000:9827:4100::/64", + required=True, + ) + run_parser.set_defaults(func=run) + + clean_one_parser = subparsers.add_parser( + "clean-one", help="Clean your system for a given service / ipv6 ranges." + ) + for flag, config in SHARED_OPTIONS: + clean_one_parser.add_argument(flag, **config) + + clean_one_parser.set_defaults(func=clean_one) + + clean_parser = subparsers.add_parser( + "clean", help="Clean all configurations made by this script." + ) + clean_parser.add_argument("--skip-root", action="store_true") + clean_parser.set_defaults(func=clean) + + # Check if a command is being ran, otherwise print help. + args = parser.parse_args() + if hasattr(args, "func"): + args.func(args) + else: + parser.print_help() diff --git a/smart_ipv6_rotator/const.py b/smart_ipv6_rotator/const.py new file mode 100644 index 0000000..851b967 --- /dev/null +++ b/smart_ipv6_rotator/const.py @@ -0,0 +1,12 @@ +from pyroute2 import IPDB, IPRoute + +ICANHAZIP_IPV6_ADDRESS = "2606:4700::6812:7261" + +JSON_CONFIG_FILE = "/tmp/smart-ipv6-rotator.json" + +LEGACY_CONFIG_FILE = "/tmp/smart-ipv6-rotator.py" + +IP = IPDB() +IPROUTE = IPRoute() + +__all__: list[str] = ["ICANHAZIP_IPV6_ADDRESS", "IP", "IPROUTE"] diff --git a/smart_ipv6_rotator/helpers.py b/smart_ipv6_rotator/helpers.py new file mode 100644 index 0000000..4e19fa1 --- /dev/null +++ b/smart_ipv6_rotator/helpers.py @@ -0,0 +1,204 @@ +import json +import os +import sys +from dataclasses import asdict +from time import sleep +from typing import Iterator + +import requests + +from smart_ipv6_rotator.const import ICANHAZIP_IPV6_ADDRESS, IPROUTE, JSON_CONFIG_FILE +from smart_ipv6_rotator.models import SavedRanges +from smart_ipv6_rotator.ranges import RANGES + + +def root_check(skip_root: bool = False) -> None: + if os.geteuid() != 0 and not skip_root: + sys.exit("[Error] Please run this script as root! It needs root privileges.") + + +def check_ipv6_connectivity() -> None: + try: + requests.get("http://ipv6.icanhazip.com", timeout=5) + except requests.Timeout: + sys.exit("[Error] You do not have IPv6 connectivity. This script can not work.") + except requests.HTTPError: + sys.exit( + "[ERROR] icanhazip didn't return the expected status, possibly they are down right now." + ) + + print("[INFO] You have IPv6 connectivity. Continuing.") + + +def what_ranges( + services: str | None = None, + ipv6_ranges: str | None = None, + no_services: bool = False, +) -> list[str]: + """Works out what service ranges the user wants to use. + + Args: + services (str | None, optional): Defaults to None. + ipv6_ranges (str | None, optional): Defaults to None. + no_services (bool, optional): Default to False + + Returns: + list[str]: IPV6 ranges + """ + + ranges_: list[str] = [] + + if services and not no_services: + for service in services.split(","): + if service not in RANGES: + sys.exit(f"{service} isn't a valid service.") + + ranges_ += list(RANGES[service]) + + if ipv6_ranges: + ranges_ += ipv6_ranges.split(",") + + if not ranges_: + sys.exit("No service or ranges given.") + + return list(set(ranges_)) + + +def clean_ipv6_check(config: SavedRanges) -> None: + try: + IPROUTE.route( + "del", + dst=ICANHAZIP_IPV6_ADDRESS, + prefsrc=config.random_ipv6_address, + gateway=config.gateway, + oif=config.interface_index, + ) + except: + pass + + +def clean_ranges(ranges_: list[str], skip_root: bool) -> None: + """Cleans root. + + Args: + ranges_ (list[str]): + skip_root (bool): + """ + + root_check(skip_root) + + previous_config = PreviousConfig(ranges_) + + previous = previous_config.get() + if not previous: + print("[INFO] No cleanup of previous setup needed.") + return + + clean_ipv6_check(previous) + + try: + for ipv6_range in previous.ranges: + IPROUTE.route( + "del", + dst=ipv6_range, + prefsrc=previous.random_ipv6_address, + gateway=previous.gateway, + oif=previous.interface_index, + ) + except: + print( + f"""[Error] Failed to remove the configured IPv6 subnets {','.join(previous.ranges)} + May be expected if the route were not yet configured and that was a cleanup due to an error + """ + ) + + try: + IPROUTE.addr( + "del", + previous.interface_index, + address=previous.random_ipv6_address, + mask=previous.random_ipv6_address_mask, + ) + except: + print("[Error] Failed to remove the random IPv6 address, very unexpected!") + + previous_config.remove() + + print( + "[INFO] Finished cleaning up previous setup.\n[INFO] Waiting for the propagation in the Linux kernel." + ) + + sleep(6) + + +def previous_configs() -> Iterator[SavedRanges]: + configs = PreviousConfig._get_raw() + + for config in configs: + yield SavedRanges(**config) + + +class PreviousConfig: + def __init__( + self, + ranges_: list[str], + ) -> None: + self.__ranges = ranges_ + + @classmethod + def _get_raw(cls) -> list[dict]: + if not os.path.exists(JSON_CONFIG_FILE): + return [] + + with open(JSON_CONFIG_FILE, "r") as f_: + return json.loads(f_.read()) + + def __ranges_exist(self, results: dict) -> bool: + return all(value in self.__ranges for value in results["ranges"]) + + def remove(self) -> None: + """Remove range from json file.""" + + results = self._get_raw() + to_remove_index = next( + ( + index + for index, ranges in enumerate(results) + if self.__ranges_exist(ranges) + ), + None, + ) + + if to_remove_index is not None: + results.pop(to_remove_index) + + with open(JSON_CONFIG_FILE, "w") as f_: + f_.write(json.dumps(results)) + + def save(self, to_save: SavedRanges) -> None: + """Save a given service/ipv6 ranges for cleanup later. + + Args: + ranges_ (list[str]): IPV6 ranges + """ + + self.remove() + + results = self._get_raw() + results.append(asdict(to_save)) + + with open(JSON_CONFIG_FILE, "w") as f_: + f_.write(json.dumps(results)) + + def get(self) -> SavedRanges | None: + """Gets saved ranges. + + Returns: + SavedRanges | None: Save ranges. + """ + + results = self._get_raw() + + for result in results: + if self.__ranges_exist(result): + return SavedRanges(**result) diff --git a/smart_ipv6_rotator/models.py b/smart_ipv6_rotator/models.py new file mode 100644 index 0000000..974887b --- /dev/null +++ b/smart_ipv6_rotator/models.py @@ -0,0 +1,12 @@ +from dataclasses import dataclass + + +@dataclass +class SavedRanges: + ranges: list[str] + random_ipv6_address: str + gateway: str + interface_index: int + interface_name: str + ipv6_subnet: str + random_ipv6_address_mask: int diff --git a/smart_ipv6_rotator/ranges.py b/smart_ipv6_rotator/ranges.py new file mode 100644 index 0000000..c1afad9 --- /dev/null +++ b/smart_ipv6_rotator/ranges.py @@ -0,0 +1,13 @@ +RANGES: dict[str, list[str]] = { + "google": [ + "2001:4860:4000::/36", + "2404:6800:4000::/36", + "2607:f8b0:4000::/36", + "2800:3f0:4000::/36", + "2a00:1450:4000::/36", + "2c0f:fb50:4000::/36", + ] +} + + +__all__: list[str] = ["RANGES"]