mirror of
https://github.com/iv-org/smart-ipv6-rotator.git
synced 2024-11-23 05:59:45 +00:00
remove config file + switch to args
This commit is contained in:
parent
0d674e138a
commit
5d0b9b4032
@ -6,11 +6,10 @@
|
||||
- requests
|
||||
|
||||
# How to setup (very simple tutorial for the moment)
|
||||
1. Git clone the repository somewhere
|
||||
2. Copy the file `config.py.example` to `config.py`.
|
||||
3. Change the `ipv6_subnet` to your IPv6 subnet. If you do not know it, you can use a tool like http://www.gestioip.net/cgi-bin/subnet_calculator.cgi
|
||||
4. Run once the script using `sudo python smart-ipv6-rotator.py run`
|
||||
5. If everything went well then configure a cron for periodically rotate your IPv6 range.
|
||||
1. Git clone the repository somewhere.
|
||||
2. Change the `ipv6_subnet` to your IPv6 subnet. If you do not know it, you can use a tool like http://www.gestioip.net/cgi-bin/subnet_calculator.cgi
|
||||
3. Run once the script using `sudo python smart-ipv6-rotator.py run --ipv6range=YOURIPV6SUBNET/64`
|
||||
4. If everything went well then configure a cron for periodically rotate your IPv6 range.
|
||||
Twice a day (noon and midnight) is enough for YouTube servers. Also at the reboot of the server!
|
||||
|
||||
# How to clean the configuration done by the script
|
||||
|
@ -1 +0,0 @@
|
||||
ipv6_subnet = "2001:db8:100::/64"
|
@ -1,6 +1,8 @@
|
||||
from runpy import run_path
|
||||
#!/usr/bin/env python
|
||||
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
from random import seed, getrandbits, choice
|
||||
from ipaddress import IPv6Network, IPv6Address
|
||||
from time import sleep
|
||||
@ -29,233 +31,262 @@ except ModuleNotFoundError:
|
||||
ip = IPDB()
|
||||
iproute = IPRoute()
|
||||
|
||||
# https://md5calc.com/google/ip
|
||||
google_ipv6_ranges = [
|
||||
"2001:4860:4000::/36",
|
||||
"2404:6800:4000::/36",
|
||||
"2607:f8b0:4000::/36",
|
||||
"2800:3f0:4000::/36",
|
||||
"2a00:1450:4000::/36",
|
||||
"2c0f:fb50:4000::/36",
|
||||
]
|
||||
location_saved_config_ipv6_configured = "/tmp/smart-ipv6-rotator.py"
|
||||
icanhazip_ipv6_address = "2606:4700::6812:7261"
|
||||
|
||||
|
||||
def check_ipv6_connectivity():
|
||||
try:
|
||||
requests.get("http://ipv6.icanhazip.com", timeout=5)
|
||||
except requests.exceptions.RequestException:
|
||||
sys.exit("[Error] You do not have IPv6 connectivity. This script can not work.")
|
||||
|
||||
print("[INFO] You have IPv6 connectivity. Continuing.")
|
||||
|
||||
|
||||
def clean_previous_setup(existing_settings):
|
||||
if (
|
||||
os.path.isfile(location_saved_config_ipv6_configured)
|
||||
or len(existing_settings) > 0
|
||||
):
|
||||
settings = existing_settings
|
||||
if len(existing_settings) == 0:
|
||||
settings = run_path(location_saved_config_ipv6_configured)
|
||||
try:
|
||||
iproute.route(
|
||||
"del",
|
||||
dst=icanhazip_ipv6_address,
|
||||
prefsrc=settings["random_ipv6_address"],
|
||||
gateway=settings["gateway"],
|
||||
oif=settings["interface_index"],
|
||||
)
|
||||
except:
|
||||
print(
|
||||
"[Error] Failed to remove the test IPv6 subnet.\n"
|
||||
" May be expected if the route were not yet configured and that was a cleanup due to an error."
|
||||
class SmartIPv6Rotator(object):
|
||||
def __init__(self):
|
||||
if os.geteuid() != 0:
|
||||
sys.exit(
|
||||
"[Error] Please run this script as root! It needs root privileges."
|
||||
)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="IPv6 rotator",
|
||||
usage="""smart-ipv6-rotator.py <command> [<args>]
|
||||
|
||||
The available args are:
|
||||
clean Clean your system from the previous setup.
|
||||
run Run the IPv6 rotator process.
|
||||
""",
|
||||
)
|
||||
parser.add_argument("command", help="Subcommand to run")
|
||||
# parse_args defaults to [1:] for args, but you need to
|
||||
# exclude the rest of the args too, or validation will fail
|
||||
args = parser.parse_args(sys.argv[1:2])
|
||||
if not hasattr(self, args.command):
|
||||
print("Unrecognized command")
|
||||
parser.print_help()
|
||||
exit(1)
|
||||
# use dispatch pattern to invoke method with same name
|
||||
getattr(self, args.command)()
|
||||
|
||||
# https://md5calc.com/google/ip
|
||||
google_ipv6_ranges = [
|
||||
"2001:4860:4000::/36",
|
||||
"2404:6800:4000::/36",
|
||||
"2607:f8b0:4000::/36",
|
||||
"2800:3f0:4000::/36",
|
||||
"2a00:1450:4000::/36",
|
||||
"2c0f:fb50:4000::/36",
|
||||
]
|
||||
location_saved_config_ipv6_configured = "/tmp/smart-ipv6-rotator.py"
|
||||
icanhazip_ipv6_address = "2606:4700::6812:7261"
|
||||
|
||||
def check_ipv6_connectivity(self):
|
||||
try:
|
||||
for ipv6_range in google_ipv6_ranges:
|
||||
requests.get("http://ipv6.icanhazip.com", timeout=5)
|
||||
except requests.exceptions.RequestException:
|
||||
sys.exit(
|
||||
"[Error] You do not have IPv6 connectivity. This script can not work."
|
||||
)
|
||||
|
||||
print("[INFO] You have IPv6 connectivity. Continuing.")
|
||||
|
||||
def clean_previous_setup(self, existing_settings):
|
||||
if (
|
||||
os.path.isfile(self.location_saved_config_ipv6_configured)
|
||||
or len(existing_settings) > 0
|
||||
):
|
||||
settings = existing_settings
|
||||
if len(existing_settings) == 0:
|
||||
settings = run_path(self.location_saved_config_ipv6_configured)
|
||||
try:
|
||||
iproute.route(
|
||||
"del",
|
||||
dst=ipv6_range,
|
||||
dst=self.icanhazip_ipv6_address,
|
||||
prefsrc=settings["random_ipv6_address"],
|
||||
gateway=settings["gateway"],
|
||||
oif=settings["interface_index"],
|
||||
)
|
||||
except:
|
||||
except:
|
||||
print(
|
||||
"[Error] Failed to remove the test IPv6 subnet.\n"
|
||||
" May be expected if the route were not yet configured and that was a cleanup due to an error."
|
||||
)
|
||||
|
||||
try:
|
||||
for ipv6_range in self.google_ipv6_ranges:
|
||||
iproute.route(
|
||||
"del",
|
||||
dst=ipv6_range,
|
||||
prefsrc=settings["random_ipv6_address"],
|
||||
gateway=settings["gateway"],
|
||||
oif=settings["interface_index"],
|
||||
)
|
||||
except:
|
||||
print(
|
||||
"[Error] Failed to remove the configured (Google) IPv6 subnets.\n"
|
||||
" May be expected if the route were not yet configured and that was a cleanup due to an error."
|
||||
)
|
||||
|
||||
try:
|
||||
iproute.addr(
|
||||
"del",
|
||||
settings["interface_index"],
|
||||
address=settings["random_ipv6_address"],
|
||||
mask=settings["random_ipv6_address_mask"],
|
||||
)
|
||||
except:
|
||||
print(
|
||||
"[Error] Failed to remove the random IPv6 address, very unexpected!"
|
||||
)
|
||||
|
||||
if len(existing_settings) == 0:
|
||||
os.remove(self.location_saved_config_ipv6_configured)
|
||||
|
||||
print(
|
||||
"[Error] Failed to remove the configured (Google) IPv6 subnets.\n"
|
||||
" May be expected if the route were not yet configured and that was a cleanup due to an error."
|
||||
"[INFO] Finished cleaning up previous setup.\n"
|
||||
"[INFO] Waiting for the propagation in the Linux kernel."
|
||||
)
|
||||
sleep(6)
|
||||
else:
|
||||
print("[INFO] No cleanup of previous setup needed.")
|
||||
|
||||
def clean(self):
|
||||
parser = argparse.ArgumentParser(description="Clean the previous setup.")
|
||||
args = parser.parse_args(sys.argv[2:])
|
||||
self.clean_previous_setup({})
|
||||
|
||||
def run(self):
|
||||
parser = argparse.ArgumentParser(description="Run the IPv6 rotator.")
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--ipv6range",
|
||||
required=True,
|
||||
help="Example: --ipv6range=2001:861:4501:4a10::/64",
|
||||
)
|
||||
args = parser.parse_args(sys.argv[2:])
|
||||
|
||||
self.check_ipv6_connectivity()
|
||||
self.clean_previous_setup({})
|
||||
|
||||
# calculate random IPv6 from the configured subnet
|
||||
|
||||
seed()
|
||||
ipv6_network = IPv6Network(args.ipv6range)
|
||||
random_ipv6_address = str(
|
||||
IPv6Address(
|
||||
ipv6_network.network_address
|
||||
+ getrandbits(ipv6_network.max_prefixlen - ipv6_network.prefixlen)
|
||||
)
|
||||
)
|
||||
|
||||
# get default network interface for IPv6
|
||||
|
||||
default_interface = iproute.route("get", dst=choice(self.google_ipv6_ranges))[0]
|
||||
default_interface_index = int(default_interface.get_attrs("RTA_OIF")[0])
|
||||
default_interface_gateway = str(default_interface.get_attrs("RTA_GATEWAY")[0])
|
||||
default_interface_name = ip.interfaces[default_interface_index]["ifname"]
|
||||
|
||||
memory_settings = {
|
||||
"random_ipv6_address": random_ipv6_address,
|
||||
"random_ipv6_address_mask": ipv6_network.prefixlen,
|
||||
"gateway": default_interface_gateway,
|
||||
"interface_index": default_interface_index,
|
||||
"interface_name": default_interface_name,
|
||||
"ipv6_subnet": args.ipv6range,
|
||||
}
|
||||
|
||||
print("[DEBUG] Debug info:")
|
||||
for k, v in memory_settings.items():
|
||||
print(k, "-->", v)
|
||||
|
||||
try:
|
||||
iproute.addr(
|
||||
"del",
|
||||
settings["interface_index"],
|
||||
address=settings["random_ipv6_address"],
|
||||
mask=settings["random_ipv6_address_mask"],
|
||||
"add",
|
||||
default_interface_index,
|
||||
address=random_ipv6_address,
|
||||
mask=ipv6_network.prefixlen,
|
||||
)
|
||||
except:
|
||||
print("[Error] Failed to remove the random IPv6 address, very unexpected!")
|
||||
|
||||
if len(existing_settings) == 0:
|
||||
os.remove(location_saved_config_ipv6_configured)
|
||||
|
||||
print(
|
||||
"[INFO] Finished cleaning up previous setup.\n"
|
||||
"[INFO] Waiting for the propagation in the Linux kernel."
|
||||
)
|
||||
sleep(6)
|
||||
else:
|
||||
print("[INFO] No cleanup of previous setup needed.")
|
||||
|
||||
|
||||
if os.geteuid() != 0:
|
||||
sys.exit("[Error] Please run this script as root! It needs root privileges.")
|
||||
|
||||
if len(sys.argv) == 1:
|
||||
print(
|
||||
"Args:\n- run: Run the script\n- clean: Clean the previous setup.\n"
|
||||
f"Example: python {sys.argv[0]} run"
|
||||
)
|
||||
|
||||
elif sys.argv[1] == "clean":
|
||||
clean_previous_setup({})
|
||||
|
||||
elif sys.argv[1] == "run":
|
||||
check_ipv6_connectivity()
|
||||
|
||||
clean_previous_setup({})
|
||||
|
||||
try:
|
||||
settings = run_path("./config.py")
|
||||
except:
|
||||
sys.exit(
|
||||
"[ERROR] Unable to load the config file. Did you copy the config.py.example file?"
|
||||
)
|
||||
|
||||
# calculate random IPv6 from the configured subnet
|
||||
|
||||
seed()
|
||||
ipv6_network = IPv6Network(settings["ipv6_subnet"])
|
||||
random_ipv6_address = str(
|
||||
IPv6Address(
|
||||
ipv6_network.network_address
|
||||
+ getrandbits(ipv6_network.max_prefixlen - ipv6_network.prefixlen)
|
||||
)
|
||||
)
|
||||
|
||||
# get default network interface for IPv6
|
||||
|
||||
default_interface = iproute.route("get", dst=choice(google_ipv6_ranges))[0]
|
||||
default_interface_index = int(default_interface.get_attrs("RTA_OIF")[0])
|
||||
default_interface_gateway = str(default_interface.get_attrs("RTA_GATEWAY")[0])
|
||||
default_interface_name = ip.interfaces[default_interface_index]["ifname"]
|
||||
|
||||
memory_settings = {
|
||||
"random_ipv6_address": random_ipv6_address,
|
||||
"random_ipv6_address_mask": ipv6_network.prefixlen,
|
||||
"gateway": default_interface_gateway,
|
||||
"interface_index": default_interface_index,
|
||||
"interface_name": default_interface_name,
|
||||
"ipv6_subnet": settings["ipv6_subnet"],
|
||||
}
|
||||
|
||||
print("[DEBUG] Debug info:")
|
||||
for k, v in memory_settings.items():
|
||||
print (k, '-->', v)
|
||||
|
||||
try:
|
||||
iproute.addr(
|
||||
"add",
|
||||
default_interface_index,
|
||||
address=random_ipv6_address,
|
||||
mask=ipv6_network.prefixlen,
|
||||
)
|
||||
except Exception as e:
|
||||
clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
"[Error] Failed to add the new random IPv6 address. The setup did not work!\n"
|
||||
" That's unexpected! Did you correctly configured the IPv6 subnet to use?\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
# needed so that the linux kernel takes into account the new ipv6 address
|
||||
sleep(2)
|
||||
|
||||
# test that the new ipv6 route works
|
||||
try:
|
||||
iproute.route(
|
||||
"add",
|
||||
dst=icanhazip_ipv6_address,
|
||||
prefsrc=random_ipv6_address,
|
||||
gateway=default_interface_gateway,
|
||||
oif=default_interface_index,
|
||||
priority=1,
|
||||
)
|
||||
except Exception as e:
|
||||
clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
# needed so that the linux kernel takes into account the new ipv6 route
|
||||
sleep(2)
|
||||
try:
|
||||
check_new_ipv6_address = requests.get(
|
||||
f"http://[{icanhazip_ipv6_address}]",
|
||||
headers={"host": "ipv6.icanhazip.com"},
|
||||
timeout=5,
|
||||
)
|
||||
response_new_ipv6_address = check_new_ipv6_address.text.strip()
|
||||
if response_new_ipv6_address == random_ipv6_address:
|
||||
print("[INFO] Correctly using the new random IPv6 address, continuing.")
|
||||
else:
|
||||
clean_previous_setup(memory_settings)
|
||||
except Exception as e:
|
||||
self.clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
"[ERROR] The new random IPv6 is not used! The setup did not work!\n"
|
||||
" That is very unexpected, check if your IPv6 routes do not have too much priority."
|
||||
f"Address used: {response_new_ipv6_address}"
|
||||
"[Error] Failed to add the new random IPv6 address. The setup did not work!\n"
|
||||
" That's unexpected! Did you correctly configured the IPv6 subnet to use?\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
except requests.exceptions.RequestException as e:
|
||||
clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
"[ERROR] Failed to send the request for checking the new IPv6 address! The setup did not work!\n"
|
||||
" Your provider probably does not allow setting any arbitrary IPv6 address.\n"
|
||||
" Or did you correctly configured the IPv6 subnet to use?\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
# needed so that the linux kernel takes into account the new ipv6 address
|
||||
sleep(2)
|
||||
|
||||
# configure routes for ipv6 ranges of Google
|
||||
try:
|
||||
for ipv6_range in google_ipv6_ranges:
|
||||
# test that the new ipv6 route works
|
||||
try:
|
||||
iproute.route(
|
||||
"add",
|
||||
dst=ipv6_range,
|
||||
dst=self.icanhazip_ipv6_address,
|
||||
prefsrc=random_ipv6_address,
|
||||
gateway=default_interface_gateway,
|
||||
oif=default_interface_index,
|
||||
priority=1,
|
||||
)
|
||||
except Exception as e:
|
||||
clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
f"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
except Exception as e:
|
||||
self.clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
# needed so that the linux kernel takes into account the new ipv6 route
|
||||
sleep(2)
|
||||
try:
|
||||
check_new_ipv6_address = requests.get(
|
||||
f"http://[{self.icanhazip_ipv6_address}]",
|
||||
headers={"host": "ipv6.icanhazip.com"},
|
||||
timeout=5,
|
||||
)
|
||||
response_new_ipv6_address = check_new_ipv6_address.text.strip()
|
||||
if response_new_ipv6_address == random_ipv6_address:
|
||||
print("[INFO] Correctly using the new random IPv6 address, continuing.")
|
||||
else:
|
||||
self.clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
"[ERROR] The new random IPv6 is not used! The setup did not work!\n"
|
||||
" That is very unexpected, check if your IPv6 routes do not have too much priority."
|
||||
f"Address used: {response_new_ipv6_address}"
|
||||
)
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
"[ERROR] Failed to send the request for checking the new IPv6 address! The setup did not work!\n"
|
||||
" Your provider probably does not allow setting any arbitrary IPv6 address.\n"
|
||||
" Or did you correctly configured the IPv6 subnet to use?\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
|
||||
print("[INFO] Correctly configured the IPv6 routes for Google IPv6 ranges.\n"
|
||||
"[INFO] Successful setup. Waiting for the propagation in the Linux kernel.")
|
||||
sleep(6)
|
||||
# configure routes for ipv6 ranges of Google
|
||||
try:
|
||||
for ipv6_range in self.google_ipv6_ranges:
|
||||
iproute.route(
|
||||
"add",
|
||||
dst=ipv6_range,
|
||||
prefsrc=random_ipv6_address,
|
||||
gateway=default_interface_gateway,
|
||||
oif=default_interface_index,
|
||||
priority=1,
|
||||
)
|
||||
except Exception as e:
|
||||
self.clean_previous_setup(memory_settings)
|
||||
sys.exit(
|
||||
f"[Error] Failed to configure the test IPv6 route. The setup did not work!\n"
|
||||
f"Exception:\n{e}"
|
||||
)
|
||||
|
||||
# saving configuration to a file for future cleanup
|
||||
file = open(location_saved_config_ipv6_configured, "w")
|
||||
file.write(
|
||||
'random_ipv6_address="%s"\nrandom_ipv6_address_mask=%s\ngateway="%s"\ninterface_index=%s'
|
||||
% (
|
||||
random_ipv6_address,
|
||||
ipv6_network.prefixlen,
|
||||
default_interface_gateway,
|
||||
default_interface_index,
|
||||
print(
|
||||
"[INFO] Correctly configured the IPv6 routes for Google IPv6 ranges.\n"
|
||||
"[INFO] Successful setup. Waiting for the propagation in the Linux kernel."
|
||||
)
|
||||
)
|
||||
file.close()
|
||||
sleep(6)
|
||||
|
||||
# saving configuration to a file for future cleanup
|
||||
file = open(self.location_saved_config_ipv6_configured, "w")
|
||||
file.write(
|
||||
'random_ipv6_address="%s"\nrandom_ipv6_address_mask=%s\ngateway="%s"\ninterface_index=%s'
|
||||
% (
|
||||
random_ipv6_address,
|
||||
ipv6_network.prefixlen,
|
||||
default_interface_gateway,
|
||||
default_interface_index,
|
||||
)
|
||||
)
|
||||
file.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
SmartIPv6Rotator()
|
||||
|
47
testarg.py
Normal file
47
testarg.py
Normal file
@ -0,0 +1,47 @@
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
|
||||
class FakeGit(object):
|
||||
|
||||
def __init__(self):
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Pretends to be git',
|
||||
usage='''git <command> [<args>]
|
||||
|
||||
The most commonly used git commands are:
|
||||
commit Record changes to the repository
|
||||
fetch Download objects and refs from another repository
|
||||
''')
|
||||
parser.add_argument('command', help='Subcommand to run')
|
||||
# parse_args defaults to [1:] for args, but you need to
|
||||
# exclude the rest of the args too, or validation will fail
|
||||
args = parser.parse_args(sys.argv[1:2])
|
||||
if not hasattr(self, args.command):
|
||||
print('Unrecognized command')
|
||||
parser.print_help()
|
||||
exit(1)
|
||||
# use dispatch pattern to invoke method with same name
|
||||
getattr(self, args.command)()
|
||||
|
||||
def commit(self):
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Record changes to the repository')
|
||||
# prefixing the argument with -- means it's optional
|
||||
parser.add_argument('--amend', action='store_true', required=True)
|
||||
# now that we're inside a subcommand, ignore the first
|
||||
# TWO argvs, ie the command (git) and the subcommand (commit)
|
||||
args = parser.parse_args(sys.argv[2:])
|
||||
print('Running git commit, amend=%s' % args.amend)
|
||||
|
||||
def fetch(self):
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Download objects and refs from another repository')
|
||||
# NOT prefixing the argument with -- means it's not optional
|
||||
parser.add_argument('repository')
|
||||
args = parser.parse_args(sys.argv[2:])
|
||||
print('Running git fetch, repository=%s' % args.repository)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
FakeGit()
|
Loading…
Reference in New Issue
Block a user