Files
archived-central/central/github/app.py
2023-03-20 06:34:45 +01:00

156 lines
4.4 KiB
Python

from .. import utils
from ..config import cfg
import json
import jwt
import logging
import requests
import time
_AUTH_MANAGER = None
_JWT_EXPIRY_SECS = 600
_INSTALL_TOKEN_EXPIRY_SECS = 3600
_EXPECTED_PERMS = {
"checks": "write",
"contents": "read",
"issues": "write",
"members": "read",
"metadata": "read",
"pull_requests": "write",
"statuses": "write",
}
_EXPECTED_WEBHOOK_EVENTS = [
"check_run",
"commit_comment",
"issue_comment",
"pull_request",
"pull_request_review",
"pull_request_review_comment",
"push",
]
class AuthManager:
def __init__(self, app_id, app_priv_key_path):
self.app_id = str(app_id)
with open(app_priv_key_path, "rb") as fp:
self.app_pkey = jwt.jwk_from_pem(fp.read())
self.cached_app_jwt = None
self.cached_app_jwt_deadline = 0
self.org_installs = {}
self.org_tokens = {}
self._update_installations()
def _update_installations(self):
new_org_installs = {}
installs = requests.get(
"https://api.github.com/app/installations", auth=AppAuth(auth_manager=self)
).json()
for install in installs:
if install["target_type"].lower() == "organization":
new_org_installs[install["account"]["login"]] = install["id"]
self.org_installs = new_org_installs
@property
def app_jwt(self):
t = int(time.time())
if t < self.cached_app_jwt_deadline:
return self.cached_app_jwt
payload = {
"iat": t,
"exp": t + _JWT_EXPIRY_SECS,
"iss": self.app_id,
}
self.cached_app_jwt = jwt.JWT().encode(payload, self.app_pkey, alg="RS256")
self.cached_app_jwt_deadline = t + int(_JWT_EXPIRY_SECS * 0.8)
return self.cached_app_jwt
def org_token(self, org):
if org not in self.org_installs:
self._update_installations()
if org not in self.org_installs:
raise RuntimeError(f"App is not installed for org {org}")
iid = self.org_installs[org]
t = time.time()
if org in self.org_tokens:
tok, deadline = self.org_tokens[org]
if t < deadline:
return tok
res = requests.post(
f"https://api.github.com/app/installations/{iid}/access_tokens",
auth=AppAuth(auth_manager=self),
data="{}",
).json()
tok = res["token"]
deadline = t + _INSTALL_TOKEN_EXPIRY_SECS * 0.8
self.org_tokens[org] = (tok, deadline)
return tok
class AppAuth(requests.auth.AuthBase):
"""Use to authenticate as the GitHub app (not a repo/org instance of it)."""
def __init__(self, auth_manager=None):
super().__init__()
self.auth_manager = auth_manager if auth_manager is not None else _AUTH_MANAGER
def __call__(self, r):
r.headers["authorization"] = f"Bearer {self.auth_manager.app_jwt}"
return r
class OrgAuth(requests.auth.AuthBase):
"""Use to authenticate as an org install of the GitHub app."""
def __init__(self, org, auth_manager=None):
super().__init__()
self.org = org
self.auth_manager = auth_manager if auth_manager is not None else _AUTH_MANAGER
def __call__(self, r):
r.headers["authorization"] = f"Bearer {self.auth_manager.org_token(self.org)}"
return r
def check_app_configuration():
app = requests.get("https://api.github.com/app", auth=AppAuth()).json()
for (perm, val) in sorted(_EXPECTED_PERMS.items()):
if perm not in app["permissions"]:
logging.error("Missing GH app permission: %s (should be: %s)", perm, val)
elif val != app["permissions"][perm]:
logging.error("Wrong GH app permission: %s should be %s", perm, val)
for evt in _EXPECTED_WEBHOOK_EVENTS:
if evt not in app["events"]:
logging.error("Missing GH app webhook delivery: %s", evt)
hook_data = {
"content_type": "json",
"url": cfg.web.external_url + "/gh/hook/",
"secret": cfg.github.hook_hmac_secret,
"insecure_ssl": "0",
}
requests.patch(
"https://api.github.com/app/hook/config",
auth=AppAuth(),
data=json.dumps(hook_data),
)
def start():
global _AUTH_MANAGER
_AUTH_MANAGER = AuthManager(cfg.github.app.id, cfg.github.app.priv_key_path)
utils.spawn_periodic_task(600, check_app_configuration)