mirror of
https://github.com/dolphin-emu/central.git
synced 2026-01-31 01:15:21 +01:00
156 lines
4.4 KiB
Python
156 lines
4.4 KiB
Python
from .. import utils
|
|
from ..config import cfg
|
|
|
|
import json
|
|
import jwt
|
|
import logging
|
|
import requests
|
|
import time
|
|
|
|
_AUTH_MANAGER = None
|
|
_JWT_EXPIRY_SECS = 600
|
|
_INSTALL_TOKEN_EXPIRY_SECS = 3600
|
|
|
|
_EXPECTED_PERMS = {
|
|
"checks": "write",
|
|
"contents": "read",
|
|
"issues": "write",
|
|
"members": "read",
|
|
"metadata": "read",
|
|
"pull_requests": "write",
|
|
"statuses": "write",
|
|
}
|
|
|
|
_EXPECTED_WEBHOOK_EVENTS = [
|
|
"check_run",
|
|
"commit_comment",
|
|
"issue_comment",
|
|
"pull_request",
|
|
"pull_request_review",
|
|
"pull_request_review_comment",
|
|
"push",
|
|
]
|
|
|
|
|
|
class AuthManager:
|
|
def __init__(self, app_id, app_priv_key_path):
|
|
self.app_id = str(app_id)
|
|
with open(app_priv_key_path, "rb") as fp:
|
|
self.app_pkey = jwt.jwk_from_pem(fp.read())
|
|
|
|
self.cached_app_jwt = None
|
|
self.cached_app_jwt_deadline = 0
|
|
|
|
self.org_installs = {}
|
|
self.org_tokens = {}
|
|
|
|
self._update_installations()
|
|
|
|
def _update_installations(self):
|
|
new_org_installs = {}
|
|
|
|
installs = requests.get(
|
|
"https://api.github.com/app/installations", auth=AppAuth(auth_manager=self)
|
|
).json()
|
|
for install in installs:
|
|
if install["target_type"].lower() == "organization":
|
|
new_org_installs[install["account"]["login"]] = install["id"]
|
|
|
|
self.org_installs = new_org_installs
|
|
|
|
@property
|
|
def app_jwt(self):
|
|
t = int(time.time())
|
|
if t < self.cached_app_jwt_deadline:
|
|
return self.cached_app_jwt
|
|
|
|
payload = {
|
|
"iat": t,
|
|
"exp": t + _JWT_EXPIRY_SECS,
|
|
"iss": self.app_id,
|
|
}
|
|
self.cached_app_jwt = jwt.JWT().encode(payload, self.app_pkey, alg="RS256")
|
|
self.cached_app_jwt_deadline = t + int(_JWT_EXPIRY_SECS * 0.8)
|
|
return self.cached_app_jwt
|
|
|
|
def org_token(self, org):
|
|
if org not in self.org_installs:
|
|
self._update_installations()
|
|
if org not in self.org_installs:
|
|
raise RuntimeError(f"App is not installed for org {org}")
|
|
iid = self.org_installs[org]
|
|
|
|
t = time.time()
|
|
if org in self.org_tokens:
|
|
tok, deadline = self.org_tokens[org]
|
|
if t < deadline:
|
|
return tok
|
|
|
|
res = requests.post(
|
|
f"https://api.github.com/app/installations/{iid}/access_tokens",
|
|
auth=AppAuth(auth_manager=self),
|
|
data="{}",
|
|
).json()
|
|
tok = res["token"]
|
|
deadline = t + _INSTALL_TOKEN_EXPIRY_SECS * 0.8
|
|
self.org_tokens[org] = (tok, deadline)
|
|
return tok
|
|
|
|
|
|
class AppAuth(requests.auth.AuthBase):
|
|
"""Use to authenticate as the GitHub app (not a repo/org instance of it)."""
|
|
|
|
def __init__(self, auth_manager=None):
|
|
super().__init__()
|
|
self.auth_manager = auth_manager if auth_manager is not None else _AUTH_MANAGER
|
|
|
|
def __call__(self, r):
|
|
r.headers["authorization"] = f"Bearer {self.auth_manager.app_jwt}"
|
|
return r
|
|
|
|
|
|
class OrgAuth(requests.auth.AuthBase):
|
|
"""Use to authenticate as an org install of the GitHub app."""
|
|
|
|
def __init__(self, org, auth_manager=None):
|
|
super().__init__()
|
|
self.org = org
|
|
self.auth_manager = auth_manager if auth_manager is not None else _AUTH_MANAGER
|
|
|
|
def __call__(self, r):
|
|
r.headers["authorization"] = f"Bearer {self.auth_manager.org_token(self.org)}"
|
|
return r
|
|
|
|
|
|
def check_app_configuration():
|
|
app = requests.get("https://api.github.com/app", auth=AppAuth()).json()
|
|
|
|
for (perm, val) in sorted(_EXPECTED_PERMS.items()):
|
|
if perm not in app["permissions"]:
|
|
logging.error("Missing GH app permission: %s (should be: %s)", perm, val)
|
|
elif val != app["permissions"][perm]:
|
|
logging.error("Wrong GH app permission: %s should be %s", perm, val)
|
|
|
|
for evt in _EXPECTED_WEBHOOK_EVENTS:
|
|
if evt not in app["events"]:
|
|
logging.error("Missing GH app webhook delivery: %s", evt)
|
|
|
|
hook_data = {
|
|
"content_type": "json",
|
|
"url": cfg.web.external_url + "/gh/hook/",
|
|
"secret": cfg.github.hook_hmac_secret,
|
|
"insecure_ssl": "0",
|
|
}
|
|
requests.patch(
|
|
"https://api.github.com/app/hook/config",
|
|
auth=AppAuth(),
|
|
data=json.dumps(hook_data),
|
|
)
|
|
|
|
|
|
def start():
|
|
global _AUTH_MANAGER
|
|
_AUTH_MANAGER = AuthManager(cfg.github.app.id, cfg.github.app.priv_key_path)
|
|
|
|
utils.spawn_periodic_task(600, check_app_configuration)
|