feat(vercel): installation and product viewset + integration (#37035)

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Jonathan Mieloo
2025-09-01 17:33:33 +02:00
committed by GitHub
parent 4d0dc720de
commit 13eeb03d9f
14 changed files with 1181 additions and 8 deletions

View File

@@ -279,7 +279,7 @@ class VercelAuthentication(authentication.BaseAuthentication):
def authenticate(self, request: Request) -> tuple[VercelUser, None] | None:
token = self._get_bearer_token(request)
if not token:
return None
raise AuthenticationFailed("Missing Token for Vercel request")
auth_type = self._get_vercel_auth_type(request)

100
ee/api/vercel/test/base.py Normal file
View File

@@ -0,0 +1,100 @@
import base64
from posthog.test.base import APIBaseTest
from django.utils import timezone
import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from posthog.models.organization_integration import OrganizationIntegration
from ee.api.authentication import VercelAuthentication
class VercelTestBase(APIBaseTest):
def setUp(self):
super().setUp()
self.installation_id = "inst_123456789"
self.account_id = "acc987654321"
self.user_id = "111222333abc"
self.installation = OrganizationIntegration.objects.create(
organization=self.organization,
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL,
integration_id=self.installation_id,
config={
"billing_plan_id": "free",
"scopes": ["read", "write"],
"credentials": {"access_token": "test_token", "token_type": "Bearer"},
},
created_by=self.user,
)
self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
self.public_key = self.private_key.public_key()
public_numbers = self.public_key.public_numbers()
n = self._int_to_base64url(public_numbers.n)
e = self._int_to_base64url(public_numbers.e)
self.test_kid = "test_key_id"
self.mock_jwks = {"keys": [{"kid": self.test_kid, "kty": "RSA", "use": "sig", "alg": "RS256", "n": n, "e": e}]}
def _int_to_base64url(self, value: int) -> str:
byte_length = (value.bit_length() + 7) // 8
value_bytes = value.to_bytes(byte_length, byteorder="big")
return base64.urlsafe_b64encode(value_bytes).decode("ascii").rstrip("=")
def _create_jwt_token(self, payload: dict, headers: dict | None = None) -> str:
if headers is None:
headers = {"kid": self.test_kid}
private_key_pem = self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
private_key_str = private_key_pem.decode("utf-8")
return jwt.encode(payload, private_key_str, algorithm="RS256", headers=headers)
def _create_user_auth_payload(
self,
installation_id: str | None = None,
account_id: str | None = None,
user_id: str | None = None,
user_role: str = "ADMIN",
) -> dict:
return {
"iss": VercelAuthentication.VERCEL_ISSUER,
"sub": f"account:{account_id or self.account_id}:user:{user_id or self.user_id}",
"aud": "test_audience",
"account_id": account_id or self.account_id,
"installation_id": installation_id or self.installation_id,
"user_id": user_id or self.user_id,
"user_role": user_role,
"exp": timezone.now().timestamp() + 3600,
}
def _create_system_auth_payload(self, installation_id: str | None = None, account_id: str | None = None) -> dict:
account = account_id or self.account_id
return {
"iss": VercelAuthentication.VERCEL_ISSUER,
"sub": f"account:{account[3:] if account.startswith('acc') else account}",
"aud": "test_audience",
"account_id": account,
"installation_id": installation_id or self.installation_id,
"exp": timezone.now().timestamp() + 3600,
}
def _get_auth_headers(self, auth_type: str = "user") -> dict:
if auth_type == "user":
token = self._create_jwt_token(self._create_user_auth_payload())
elif auth_type == "system":
token = self._create_jwt_token(self._create_system_auth_payload())
else:
raise ValueError(f"Unsupported auth type: {auth_type}")
return {"HTTP_AUTHORIZATION": f"Bearer {token}", "HTTP_X_VERCEL_AUTH": auth_type}

View File

@@ -131,15 +131,14 @@ class TestVercelAuthentication(SimpleTestCase):
def test_missing_authorization_header(self, mock_get_jwks):
request = self.factory.get("/", HTTP_X_VERCEL_AUTH="user")
result = self.auth.authenticate(request)
assert result is None
with self.assertRaises(AuthenticationFailed):
self.auth.authenticate(request)
def test_missing_vercel_auth_header(self, mock_get_jwks):
token = self._token()
request = self.factory.get("/", HTTP_AUTHORIZATION=f"Bearer {token}")
with self.assertRaises(AuthenticationFailed) as cm:
with self.assertRaises(AuthenticationFailed):
self.auth.authenticate(request)
assert "Missing or invalid X-Vercel-Auth header" in str(cm.exception)
def test_invalid_vercel_auth_header(self, mock_get_jwks):
token = self._token()

View File

@@ -0,0 +1,96 @@
import json
from unittest.mock import MagicMock, _patch, patch
from rest_framework import status
from ee.api.vercel.test.base import VercelTestBase
class TestVercelInstallationAPI(VercelTestBase):
client_id_patcher: _patch
jwks_patcher: _patch
mock_get_jwks: MagicMock
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.client_id_patcher = patch("ee.settings.VERCEL_CLIENT_INTEGRATION_ID", "test_audience")
cls.jwks_patcher = patch("ee.api.authentication.get_vercel_jwks")
cls.client_id_patcher.start()
cls.mock_get_jwks = cls.jwks_patcher.start()
@classmethod
def tearDownClass(cls):
cls.client_id_patcher.stop()
cls.jwks_patcher.stop()
super().tearDownClass()
def setUp(self):
super().setUp()
self.url = f"/api/vercel/v1/installations/{self.installation_id}/"
self.upsert_payload = {
"scopes": ["read", "write"],
"acceptedPolicies": {"toc": "2024-02-28T10:00:00Z"},
"credentials": {"access_token": "token", "token_type": "Bearer"},
"account": {"name": "Account", "url": "https://example.com", "contact": {"email": "test@example.com"}},
}
self.update_payload = {"billingPlanId": "pro200"}
self.mock_get_jwks.return_value = self.mock_jwks
def _request(self, method, url=None, data=None, auth_type="user", **kwargs):
headers = self._get_auth_headers(auth_type)
url = url or self.url
if data:
kwargs.update(content_type="application/json", data=json.dumps(data))
return getattr(self.client, method)(url, **headers, **kwargs)
@patch("ee.vercel.integration.VercelIntegration.get_installation_billing_plan")
def test_retrieve_calls_get_installation_billing_plan(self, mock_get):
mock_get.return_value = {"billingplan": {"id": "free"}}
response = self._request("get", auth_type="system")
assert response.status_code == status.HTTP_200_OK
mock_get.assert_called_once_with(self.installation_id)
@patch("ee.vercel.integration.VercelIntegration.get_vercel_plans")
def test_plans_calls_get_vercel_plans(self, mock_plans):
mock_plans.return_value = [{"id": "free"}, {"id": "paid"}]
response = self._request("get", url=f"{self.url}plans/", auth_type="system")
assert response.status_code == status.HTTP_200_OK
mock_plans.assert_called_once()
@patch("ee.vercel.integration.VercelIntegration.update_installation")
def test_update_calls_upsert_installation(self, mock_update):
response = self._request("patch", data=self.update_payload)
assert response.status_code == status.HTTP_204_NO_CONTENT
mock_update.assert_called_once_with(self.installation_id, "pro200")
@patch("ee.vercel.integration.VercelIntegration.update_installation")
def test_partial_update_calls_update_installation(self, mock_update):
response = self._request("patch", data=self.update_payload)
assert response.status_code == status.HTTP_204_NO_CONTENT
mock_update.assert_called_once_with(self.installation_id, "pro200")
@patch("ee.vercel.integration.VercelIntegration.upsert_installation")
def test_create_calls_upsert_installation(self, mock_upsert):
response = self._request("put", data=self.upsert_payload)
assert response.status_code == status.HTTP_204_NO_CONTENT
mock_upsert.assert_called_once_with(self.installation_id, self.upsert_payload)
@patch("ee.vercel.integration.VercelIntegration.delete_installation")
def test_destroy_calls_delete_installation(self, mock_delete):
mock_delete.return_value = {"finalized": True}
response = self._request("delete")
assert response.status_code == status.HTTP_200_OK
mock_delete.assert_called_once_with(self.installation_id)
def test_invalid_installation_id_format(self):
url = "/api/vercel/v1/installations/invalid-id/"
response = self._request("get", url=url, auth_type="system")
assert response.status_code == status.HTTP_404_NOT_FOUND

View File

@@ -0,0 +1,227 @@
import json
from typing import Any
import pytest
from unittest.mock import MagicMock, patch
from parameterized import parameterized
from rest_framework import status
from rest_framework.exceptions import AuthenticationFailed, PermissionDenied
from posthog.models import Organization, Team
from posthog.models.organization_integration import OrganizationIntegration
from ee.api.vercel.test.base import VercelTestBase
from ee.api.vercel.vercel_permission import VercelPermission
class TestVercelPermission(VercelTestBase):
def setUp(self):
super().setUp()
self.permission = VercelPermission()
self.mock_view = MagicMock()
self.mock_request = MagicMock()
self.claims_patcher = patch("ee.api.vercel.vercel_permission.get_vercel_claims")
self.mock_get_claims = self.claims_patcher.start()
def tearDown(self):
self.claims_patcher.stop()
super().tearDown()
def _setup_request(self, auth_type="user", action="update", supported_types=None, headers=None, **claims):
if headers is not None:
self.mock_request.headers = headers
else:
self.mock_request.headers = {"X-Vercel-Auth": auth_type}
self.mock_view.action = action
if supported_types:
self.mock_view.vercel_supported_auth_types = {action: supported_types}
self.mock_get_claims.return_value = claims
def _assert_permission_denied(self, func, expected_msg):
with pytest.raises(PermissionDenied) as exc_info:
func()
assert expected_msg in str(exc_info.value.detail)
def _assert_auth_failed(self, func, expected_msg):
with pytest.raises(AuthenticationFailed) as exc_info:
func()
assert expected_msg in str(exc_info.value.detail)
def test_has_permission_validates_auth_type(self):
self._setup_request(action="update", supported_types=["user"], user_role="ADMIN")
assert self.permission.has_permission(self.mock_request, self.mock_view) is True
def test_has_permission_missing_auth_header(self):
self._setup_request(headers={})
self._assert_auth_failed(
lambda: self.permission.has_permission(self.mock_request, self.mock_view), "Missing X-Vercel-Auth header"
)
def test_has_permission_invalid_auth_type(self):
self._setup_request(auth_type="system", supported_types=["user"])
self._assert_permission_denied(
lambda: self.permission.has_permission(self.mock_request, self.mock_view),
"Auth type 'system' not allowed for this endpoint",
)
@parameterized.expand(
[
({"installation_id": "inst_123"}, {"installation_id": "inst_123"}, True, None),
({"installation_id": "inst_123"}, {"installation_id": "inst_456"}, False, "Installation ID mismatch"),
({}, {"installation_id": "inst_123"}, False, "Missing installation_id"),
({"parent_lookup_installation_id": "inst_123"}, {"installation_id": "inst_123"}, True, None),
]
)
def test_installation_id_validation(self, view_kwargs, claims, should_pass, error_msg):
self.mock_view.kwargs = view_kwargs
self.mock_get_claims.return_value = claims
if should_pass:
assert self.permission.has_object_permission(self.mock_request, self.mock_view, None) is True
else:
with pytest.raises(PermissionDenied) as exc_info:
self.permission.has_object_permission(self.mock_request, self.mock_view, None)
assert str(exc_info.value.detail) == error_msg
def test_has_object_permission_no_jwt_auth(self):
self.mock_view.kwargs = {"installation_id": "inst_123"}
self.mock_get_claims.side_effect = AuthenticationFailed("Not authenticated with Vercel")
self._assert_auth_failed(
lambda: self.permission.has_object_permission(self.mock_request, self.mock_view, None),
"Not authenticated with Vercel",
)
def test_get_supported_auth_types_default(self):
self.mock_view.action = "list"
if hasattr(self.mock_view, "vercel_supported_auth_types"):
delattr(self.mock_view, "vercel_supported_auth_types")
auth_types = self.permission._get_supported_auth_types(self.mock_view)
assert auth_types == ["user", "system"]
def test_get_supported_auth_types_custom(self):
self.mock_view.action = "destroy"
self.mock_view.vercel_supported_auth_types = {"destroy": ["user", "system"], "update": ["user"]}
auth_types = self.permission._get_supported_auth_types(self.mock_view)
assert auth_types == ["user", "system"]
def test_auth_type_case_insensitive(self):
self._setup_request(auth_type="USER", supported_types=["user"], user_role="ADMIN")
assert self.permission.has_permission(self.mock_request, self.mock_view) is True
@parameterized.expand(
[
("update", "ADMIN", True, None),
("update", "USER", False, "requires ADMIN role"),
("destroy", "ADMIN", True, None),
("destroy", "USER", False, "requires ADMIN role"),
("retrieve", "USER", True, None),
("retrieve", "ADMIN", True, None),
]
)
def test_role_permissions(self, action, user_role, should_pass, error_msg):
self._setup_request(action=action, supported_types=["user"], user_role=user_role)
if should_pass:
assert self.permission.has_permission(self.mock_request, self.mock_view) is True
else:
with pytest.raises(PermissionDenied) as exc_info:
self.permission.has_permission(self.mock_request, self.mock_view)
assert error_msg in str(exc_info.value.detail)
def test_system_auth_bypasses_role_check(self):
self._setup_request(auth_type="system", supported_types=["system"])
assert self.permission.has_permission(self.mock_request, self.mock_view) is True
def test_missing_role_denied_for_admin_action(self):
self._setup_request(action="update", supported_types=["user"])
self._assert_permission_denied(
lambda: self.permission.has_permission(self.mock_request, self.mock_view), "requires ADMIN role"
)
class TestVercelPermissionIntegration(VercelTestBase):
client_id_patcher: Any
jwks_patcher: Any
mock_get_jwks: Any
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.client_id_patcher = patch("ee.settings.VERCEL_CLIENT_INTEGRATION_ID", "test_audience")
cls.jwks_patcher = patch("ee.api.authentication.get_vercel_jwks")
cls.client_id_patcher.start()
cls.mock_get_jwks = cls.jwks_patcher.start()
@classmethod
def tearDownClass(cls):
cls.client_id_patcher.stop()
cls.jwks_patcher.stop()
super().tearDownClass()
def setUp(self):
super().setUp()
self.mock_get_jwks.return_value = self.mock_jwks
self.url = f"/api/vercel/v1/installations/{self.installation_id}/"
def _make_request(
self, method, jwt_installation_id=None, user_role="ADMIN", auth_type="user", data=None, url_installation_id=None
):
jwt_installation_id = jwt_installation_id or self.installation_id
url_installation_id = url_installation_id or self.installation_id
payload = (
self._create_user_auth_payload(installation_id=jwt_installation_id, user_role=user_role)
if auth_type == "user"
else self._create_system_auth_payload(installation_id=jwt_installation_id)
)
headers = {
"HTTP_AUTHORIZATION": f"Bearer {self._create_jwt_token(payload)}",
"HTTP_X_VERCEL_AUTH": auth_type,
}
url = f"/api/vercel/v1/installations/{url_installation_id}/"
kwargs = dict(**headers)
if data:
kwargs.update(content_type="application/json", data=json.dumps(data))
return getattr(self.client, method)(url, **kwargs)
@parameterized.expand(
[
("patch", "inst_different", None, "user", {"billingPlanId": "pro200"}, status.HTTP_204_NO_CONTENT),
("delete", "inst_different", None, "user", None, status.HTTP_200_OK),
("patch", None, None, "system", {"billingPlanId": "pro200"}, status.HTTP_403_FORBIDDEN),
("get", None, None, "user", None, status.HTTP_403_FORBIDDEN),
]
)
def test_auth_validation(self, method, jwt_id, url_id, auth_type, data, expected_status):
response = self._make_request(
method, jwt_installation_id=jwt_id, url_installation_id=url_id, auth_type=auth_type, data=data
)
assert response.status_code == expected_status
@parameterized.expand(
[
("patch", "ADMIN", {"billingPlanId": "pro200"}, status.HTTP_204_NO_CONTENT),
("patch", "USER", {"billingPlanId": "pro200"}, status.HTTP_403_FORBIDDEN),
("delete", "USER", None, status.HTTP_403_FORBIDDEN),
]
)
def test_role_based_access(self, method, user_role, data, expected_status):
response = self._make_request(method, user_role=user_role, data=data)
assert response.status_code == expected_status
def test_cross_organization_access_denied(self):
other_org = Organization.objects.create(name="Other Org")
Team.objects.create(organization=other_org, name="Other Team")
other_installation_id = "inst_987654321"
OrganizationIntegration.objects.create(
organization=other_org,
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL,
integration_id=other_installation_id,
config={"billing_plan_id": "free"},
created_by=self.user,
)
response = self._make_request(
"patch",
jwt_installation_id=other_installation_id,
url_installation_id=other_installation_id,
data={"billingPlanId": "pro200"},
)
assert response.status_code == status.HTTP_204_NO_CONTENT

View File

@@ -0,0 +1,53 @@
from unittest.mock import MagicMock, _patch, patch
from rest_framework import status
from ee.api.vercel.test.base import VercelTestBase
class TestVercelProductAPI(VercelTestBase):
client_id_patcher: _patch
jwks_patcher: _patch
mock_get_jwks: MagicMock
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.client_id_patcher = patch("ee.settings.VERCEL_CLIENT_INTEGRATION_ID", "test_audience")
cls.jwks_patcher = patch("ee.api.authentication.get_vercel_jwks")
cls.client_id_patcher.start()
cls.mock_get_jwks = cls.jwks_patcher.start()
@classmethod
def tearDownClass(cls):
cls.client_id_patcher.stop()
cls.jwks_patcher.stop()
super().tearDownClass()
def setUp(self):
super().setUp()
self.mock_get_jwks.return_value = self.mock_jwks
def test_get_posthog_plans_with_user_auth(self):
headers = self._get_auth_headers("user")
response = self.client.get("/api/vercel/v1/products/posthog/plans/", **headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "plans" in data
assert len(data["plans"]) == 2
def test_get_posthog_plans_with_system_auth(self):
headers = self._get_auth_headers("system")
response = self.client.get("/api/vercel/v1/products/posthog/plans/", **headers)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert "plans" in data
assert len(data["plans"]) == 2
def test_get_invalid_product_plans(self):
headers = self._get_auth_headers("user")
response = self.client.get("/api/vercel/v1/products/invalid/plans/", **headers)
assert response.status_code == status.HTTP_404_NOT_FOUND

View File

@@ -0,0 +1,32 @@
from typing import Any
from rest_framework.exceptions import APIException
from rest_framework.response import Response
from rest_framework.views import exception_handler
class VercelErrorResponseMixin:
"""Mixin to format DRF exceptions into Vercel's error schema"""
def handle_exception(self, exc):
context: dict[str, Any] = getattr(self, "get_exception_handler_context", lambda: {})()
response = exception_handler(exc, context)
if response is not None:
response.data = self._format_vercel_error(exc, response)
return response
def _format_vercel_error(self, exc: Exception, response: Response) -> dict[str, Any]:
if isinstance(exc, APIException):
message = str(exc.detail)
else:
message = str(exc)
return {
"error": {
"code": "request_failed",
"message": message,
"user": {"message": message, "url": None},
}
}

View File

@@ -0,0 +1,131 @@
import re
from typing import Any
from rest_framework import decorators, exceptions, serializers, viewsets
from rest_framework.request import Request
from rest_framework.response import Response
from posthog.models.organization_integration import OrganizationIntegration
from ee.api.authentication import VercelAuthentication
from ee.api.vercel.vercel_error_mixin import VercelErrorResponseMixin
from ee.api.vercel.vercel_permission import VercelPermission
from ee.vercel.integration import VercelIntegration
class VercelCredentialsSerializer(serializers.Serializer):
access_token = serializers.CharField(help_text="Access token authorizes marketplace and integration APIs.")
token_type = serializers.CharField(help_text="The type of token (default: Bearer).")
class VercelContactSerializer(serializers.Serializer):
email = serializers.EmailField(help_text="Contact email address for the account.")
name = serializers.CharField(required=False, allow_blank=True, help_text="Contact name for the account (optional).")
class VercelAccountSerializer(serializers.Serializer):
name = serializers.CharField(required=False, allow_blank=True, help_text="Account name (optional).")
url = serializers.URLField(help_text="URL of the account.")
contact = VercelContactSerializer(help_text="Contact information for the account.")
class UpsertInstallationPayloadSerializer(serializers.Serializer):
scopes = serializers.ListField(
child=serializers.CharField(), min_length=1, help_text="Array of scopes, must have at least one. Min Length: 1"
)
acceptedPolicies = serializers.DictField(
child=serializers.JSONField(),
help_text='Policies accepted by the customer. Example: { "toc": "2024-02-28T10:00:00Z" }',
)
credentials = VercelCredentialsSerializer(
help_text="The service-account access token to access marketplace and integration APIs on behalf of a customer's installation."
)
account = VercelAccountSerializer(
help_text="The account information for this installation. Use Get Account Info API to re-fetch this data post installation."
)
class UpdateInstallationPayloadSerializer(serializers.Serializer):
billingPlanId = serializers.CharField(help_text='Partner-provided billing plan. Example: "pro200"')
INSTALLATION_ID_PATTERN = re.compile(r"^inst_[A-Za-z0-9]{9,}$")
class VercelInstallationViewSet(VercelErrorResponseMixin, viewsets.GenericViewSet):
lookup_field = "installation_id"
authentication_classes = [VercelAuthentication]
permission_classes = [VercelPermission]
vercel_supported_auth_types = {
"update": ["user"],
"partial_update": ["user"],
"destroy": ["user", "system"],
"retrieve": ["system"],
"plans": ["system"],
}
def get_object(self):
installation_id = self.kwargs.get("installation_id")
if not installation_id:
raise exceptions.ValidationError({"installation_id": "Missing installation_id in URL."})
if not INSTALLATION_ID_PATTERN.match(installation_id):
raise exceptions.ValidationError({"installation_id": "Invalid installation_id format."})
try:
installation = OrganizationIntegration.objects.get(
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL, integration_id=installation_id
)
return installation
except OrganizationIntegration.DoesNotExist:
raise exceptions.NotFound("Installation not found")
def update(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""
Implements: https://vercel.com/docs/integrations/create-integration/marketplace-api#upsert-installation
"""
serializer = UpsertInstallationPayloadSerializer(data=request.data)
if not serializer.is_valid():
raise exceptions.ValidationError(detail=serializer.errors)
VercelIntegration.upsert_installation(self.kwargs["installation_id"], serializer.validated_data)
return Response(status=204)
def retrieve(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""
Implements: https://vercel.com/docs/integrations/create-integration/marketplace-api#get-installation
"""
installation_id = self.kwargs.get("installation_id", "")
response_data = VercelIntegration.get_installation_billing_plan(installation_id)
return Response(response_data, status=200)
def partial_update(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""
Implements: https://vercel.com/docs/integrations/create-integration/marketplace-api#update-installation
"""
serializer = UpdateInstallationPayloadSerializer(data=request.data)
if not serializer.is_valid():
raise exceptions.ValidationError(detail=serializer.errors)
installation_id = self.kwargs["installation_id"]
VercelIntegration.update_installation(installation_id, serializer.validated_data.get("billingPlanId"))
return Response(status=204)
def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response:
"""
Implements: https://vercel.com/docs/integrations/create-integration/marketplace-api#delete-installation
"""
installation_id = self.kwargs["installation_id"]
response_data = VercelIntegration.delete_installation(installation_id)
return Response(response_data, status=200)
@decorators.action(detail=True, methods=["get"])
def plans(self, _request: Request, *_args: Any, **_kwargs: Any) -> Response:
"""
Implements: https://vercel.com/docs/integrations/create-integration/marketplace-api#get-installation-plans
"""
return Response({"plans": VercelIntegration.get_vercel_plans()})

View File

@@ -0,0 +1,109 @@
from rest_framework import exceptions
from rest_framework.permissions import BasePermission
from rest_framework.request import Request
from ee.api.vercel.utils import get_vercel_claims
class VercelPermission(BasePermission):
"""
Validates Vercel auth type, installation ID match, and user role.
Vercel auth type is determined by the X-Vercel-Auth header, and can differ per endpoint.
User roles: ADMIN can perform all operations, USER is read-only.
See Marketplace API spec for more details.
"""
# Actions that require ADMIN role (write operations)
ADMIN_ONLY_ACTIONS = {"update", "partial_update", "create", "destroy"}
# Actions that allow USER role (read-only operations)
READ_ONLY_ACTIONS = {"list", "retrieve", "plans"}
def has_permission(self, request: Request, view) -> bool:
self._validate_auth_type_allowed(request, view)
# Only validate role if this is a user auth request
# System auth bypasses role checks
auth_type = request.headers.get("X-Vercel-Auth", "").lower()
if auth_type == "user":
self._validate_user_role(request, view)
return True
def has_object_permission(self, request: Request, view, obj) -> bool:
self._validate_installation_id_match(request, view)
return True
def _get_supported_auth_types(self, view) -> list[str]:
"""
Get supported auth types for the current action from the viewset.
Each view can define an attribute like so:
vercel_supported_auth_types = {
"list": ["user"],
...
"action_name": ["system"],
}
"""
return getattr(view, "vercel_supported_auth_types", {}).get(view.action, ["user", "system"])
def _validate_auth_type_allowed(self, request: Request, view) -> None:
"""
Validate that the auth type from X-Vercel-Auth header is allowed for this endpoint.
Supported auth type is specified by the marketplace API spec.
"""
auth_type = request.headers.get("X-Vercel-Auth", "").lower()
if not auth_type:
raise exceptions.AuthenticationFailed("Missing X-Vercel-Auth header")
supported_types = self._get_supported_auth_types(view)
if auth_type not in supported_types:
raise exceptions.PermissionDenied(
f"Auth type '{auth_type}' not allowed for this endpoint. Supported types: {', '.join(supported_types)}"
)
def _validate_user_role(self, request: Request, view) -> bool:
"""
Validate that the user has the appropriate role for the action.
- ADMIN role: Required for write operations (update, create, delete)
- USER role: Allowed for read-only operations
- System auth: Bypasses role checks (used for system-to-system calls)
Returns True if validation passes, raises PermissionDenied otherwise.
"""
auth_type = request.headers.get("X-Vercel-Auth", "").lower()
# System auth bypasses role checks
if auth_type == "system":
return True
# User auth requires role validation
if auth_type == "user":
claims = get_vercel_claims(request)
user_role = str(claims.get("user_role", "")).upper()
# Check if action requires ADMIN role
if view.action in self.ADMIN_ONLY_ACTIONS:
if user_role != "ADMIN":
raise exceptions.PermissionDenied(
f"Action '{view.action}' requires ADMIN role. Current role: {user_role or 'unknown'}"
)
# For read-only actions, both ADMIN and USER roles are allowed
elif view.action in self.READ_ONLY_ACTIONS:
if user_role not in ["ADMIN", "USER"]:
raise exceptions.PermissionDenied(
f"Action '{view.action}' requires ADMIN or USER role. Current role: {user_role or 'unknown'}"
)
return True
def _validate_installation_id_match(self, request: Request, view) -> None:
"""Validate that JWT installation_id matches URL parameter"""
claims = get_vercel_claims(request)
# installation_id when going through the vercel_installation ViewSet,
# or parent_lookup_installation_id when going through the vercel_resource
installation_id = view.kwargs.get("installation_id") or view.kwargs.get("parent_lookup_installation_id")
if not installation_id:
raise exceptions.PermissionDenied("Missing installation_id")
if claims.get("installation_id") != installation_id:
raise exceptions.PermissionDenied("Installation ID mismatch")

View File

@@ -0,0 +1,28 @@
from typing import Any
from rest_framework import decorators, viewsets
from rest_framework.request import Request
from rest_framework.response import Response
from ee.api.authentication import VercelAuthentication
from ee.api.vercel.vercel_error_mixin import VercelErrorResponseMixin
from ee.api.vercel.vercel_permission import VercelPermission
from ee.vercel.integration import VercelIntegration
class VercelProductViewSet(VercelErrorResponseMixin, viewsets.GenericViewSet):
authentication_classes = [VercelAuthentication]
permission_classes = [VercelPermission]
lookup_field = "product_slug"
vercel_supported_auth_types = {
"plans": ["user", "system"],
}
@decorators.action(detail=True, methods=["get"])
def plans(self, _request: Request, *_args: Any, **_kwargs: Any) -> Response:
"""
https://vercel.com/docs/integrations/create-integration/marketplace-api#list-billing-plans-for-product
"""
product_slug = self.kwargs.get("product_slug", "")
return Response(VercelIntegration.get_product_plans(product_slug))

View File

@@ -56,7 +56,6 @@
e."$group_0" as aggregation_target,
if(notEmpty(pdi.distinct_id), pdi.person_id, e.person_id) as person_id,
person.person_props as person_props,
person.pmat_email as pmat_email,
if(event = 'step one', 1, 0) as step_0,
if(step_0 = 1, timestamp, null) as latest_0,
if(event = 'step two', 1, 0) as step_1,
@@ -80,7 +79,6 @@
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
INNER JOIN
(SELECT id,
argMax(pmat_email, version) as pmat_email,
argMax(properties, version) as person_props
FROM person
WHERE team_id = 99999
@@ -97,7 +95,7 @@
AND event IN ['step one', 'step three', 'step two']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2021-05-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2021-05-10 23:59:59', 'UTC')
AND (("pmat_email" ILIKE '%g0%'
AND ((replaceRegexpAll(JSONExtractRaw(person_props, 'email'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(e.properties, 'distinct_id'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(group_properties_0, 'name'), '^"|"$', '') ILIKE '%g0%'

199
ee/vercel/integration.py Normal file
View File

@@ -0,0 +1,199 @@
from typing import Any
from django.conf import settings
from django.db import IntegrityError, transaction
import structlog
from rest_framework import exceptions
from posthog.event_usage import report_user_signed_up
from posthog.exceptions_capture import capture_exception
from posthog.models.organization import Organization, OrganizationMembership
from posthog.models.organization_integration import OrganizationIntegration
from posthog.models.user import User
logger = structlog.get_logger(__name__)
class VercelIntegration:
@staticmethod
def _get_installation(installation_id: str) -> OrganizationIntegration:
try:
return OrganizationIntegration.objects.get(
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL, integration_id=installation_id
)
except OrganizationIntegration.DoesNotExist:
raise exceptions.NotFound("Installation not found")
@staticmethod
def get_vercel_plans() -> list[dict[str, Any]]:
# TODO: Retrieve through billing service instead.
return [
{
"id": "free",
"type": "subscription",
"name": "Free",
"description": "No credit card required",
"scope": "installation",
"paymentMethodRequired": False,
"details": [
{"label": "Data retention", "value": "1 year"},
{"label": "Projects", "value": "1"},
{"label": "Team members", "value": "Unlimited"},
{"label": "API Access", "value": ""},
{"label": "No limits on tracked users", "value": ""},
{"label": "Community support", "value": "Support via community forum"},
],
"highlightedDetails": [
{"label": "Feature Flags", "value": "1 million free requests"},
{"label": "Experiments", "value": "1 million free requests"},
],
},
{
"id": "pay_as_you_go",
"type": "subscription",
"name": "Pay-as-you-go",
"description": "Usage-based pricing after free tier",
"scope": "installation",
"paymentMethodRequired": True,
"details": [
{"label": "Data retention", "value": "7 years"},
{"label": "Projects", "value": "6"},
{"label": "Team members", "value": "Unlimited"},
{"label": "API Access", "value": ""},
{"label": "No limits on tracked users", "value": ""},
{"label": "Standard support", "value": "Support via email, Slack-based over $2k/mo"},
],
"highlightedDetails": [
{"label": "Feature flags", "value": "1 million requests for free, then from $0.0001/request"},
{"label": "Experiments", "value": "Billed with feature flags"},
],
},
]
@staticmethod
def upsert_installation(installation_id: str, payload: dict[str, Any]) -> None:
logger.info("Starting Vercel installation upsert process", installation_id=installation_id)
# Check if there's already an OrganizationIntegration for this installation_id
# If there is, we don't need to do update anything besides OrganizationIntegration's config.
organization_integration_exists = OrganizationIntegration.objects.filter(
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL,
integration_id=installation_id,
).exists()
if organization_integration_exists:
OrganizationIntegration.objects.filter(
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL,
integration_id=installation_id,
).update(config=payload)
logger.info("Vercel installation updated", installation_id=installation_id)
return
email = payload.get("account", {}).get("contact", {}).get("email")
if not email:
logger.exception("Vercel installation payload missing email", installation_id=installation_id)
raise exceptions.ValidationError(
{"validation_error": "Email is required in the payload."},
code="invalid",
)
# It's possible that there's already a user with this email signed up to PostHog,
# either due to a reinstallation of the integration, or manual signup through PostHog itself.
user = User.objects.filter(email=email).first()
user_created = False
with transaction.atomic():
try:
if not user:
user = User.objects.create_user(
email=payload["account"]["contact"]["email"],
password=None,
first_name=payload["account"]["contact"].get("name", ""),
is_staff=False,
is_email_verified=False,
)
user_created = True
# Through Vercel we can only create new organizations, not use existing ones.
# Note: We won't create a team here, that's done during Vercel resource creation.
organization = Organization.objects.create(
name=payload["account"].get("name", f"Vercel Installation {installation_id}")
)
user.join(organization=organization, level=OrganizationMembership.Level.OWNER)
OrganizationIntegration.objects.update_or_create(
organization=organization,
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL,
integration_id=installation_id,
defaults={
"config": payload,
"created_by": user,
},
)
logger.info("Created new Vercel installation", installation_id=installation_id)
except IntegrityError as e:
capture_exception(e)
logger.exception("Failed to create Vercel installation", installation_id=installation_id)
raise exceptions.ValidationError(
{"validation_error": "Something went wrong."},
code="unique",
)
if user_created:
report_user_signed_up(
user,
is_instance_first_user=False,
is_organization_first_user=True,
backend_processor="VercelIntegration",
user_analytics_metadata=user.get_analytics_metadata(),
org_analytics_metadata=organization.get_analytics_metadata(),
social_provider="vercel",
referral_source="vercel",
)
logger.info(
"Successfully created Vercel installation", installation_id=installation_id, organization_id=organization.id
)
@staticmethod
def get_installation_billing_plan(installation_id: str) -> dict[str, Any]:
VercelIntegration._get_installation(installation_id)
billing_plans = VercelIntegration.get_vercel_plans()
# Always return free plan for now - will be replaced with billing service
current_plan = next(plan for plan in billing_plans if plan["id"] == "free")
return {
"billingplan": current_plan,
}
@staticmethod
def update_installation(installation_id: str, billing_plan_id: str) -> None:
logger.info("Starting Vercel installation update", installation_id=installation_id)
# TODO: Implement billing plan update logic here, awaiting billing service implementation.
logger.info("Successfully updated Vercel installation", installation_id=installation_id)
@staticmethod
def delete_installation(installation_id: str) -> dict[str, Any]:
logger.info("Starting Vercel installation deletion", installation_id=installation_id)
installation = VercelIntegration._get_installation(installation_id)
installation.delete()
is_dev = settings.DEBUG
logger.info(
"Successfully deleted Vercel installation",
installation_id=installation_id,
finalized=is_dev,
)
return {"finalized": is_dev} # Immediately finalize in dev mode for testing purposes
@staticmethod
def get_product_plans(product_slug: str) -> dict[str, Any]:
if product_slug != "posthog":
raise exceptions.NotFound("Product not found")
return {"plans": VercelIntegration.get_vercel_plans()}

View File

@@ -0,0 +1,188 @@
from typing import Any
from unittest.mock import patch
from django.db import IntegrityError
from django.test import TestCase
from rest_framework.exceptions import NotFound, ValidationError
from posthog.models.organization import Organization, OrganizationMembership
from posthog.models.organization_integration import OrganizationIntegration
from posthog.models.user import User
from ee.vercel.integration import VercelIntegration
class TestVercelIntegration(TestCase):
def setUp(self):
self.installation_id = "inst_123456789"
self.user = User.objects.create_user(email="test@example.com", password="testpass", first_name="Test")
self.organization = Organization.objects.create(name="Test Org")
self.user.join(organization=self.organization, level=OrganizationMembership.Level.OWNER)
self.installation = OrganizationIntegration.objects.create(
organization=self.organization,
kind=OrganizationIntegration.OrganizationIntegrationKind.VERCEL,
integration_id=self.installation_id,
config={"billing_plan_id": "free", "scopes": ["read"]},
created_by=self.user,
)
self.payload: dict[str, Any] = {
"scopes": ["read", "write"],
"acceptedPolicies": {"toc": "2024-02-28T10:00:00Z"},
"credentials": {"access_token": "token", "token_type": "Bearer"},
"account": {
"name": "Test Account",
"url": "https://example.com",
"contact": {"email": "contact@example.com", "name": "John Doe"},
},
}
def test_get_installation_exists(self):
installation = VercelIntegration._get_installation(self.installation_id)
assert installation.integration_id == self.installation_id
assert installation.organization == self.organization
def test_get_installation_not_found(self):
with self.assertRaises(NotFound) as context:
VercelIntegration._get_installation("inst_nonexistent")
assert str(context.exception) == "Installation not found"
def test_get_vercel_plans_structure(self):
plans = VercelIntegration.get_vercel_plans()
assert len(plans) == 2
free_plan = next(p for p in plans if p["id"] == "free")
assert free_plan["type"] == "subscription"
assert free_plan["name"] == "Free"
assert not free_plan["paymentMethodRequired"]
paid_plan = next(p for p in plans if p["id"] == "pay_as_you_go")
assert paid_plan["type"] == "subscription"
assert paid_plan["name"] == "Pay-as-you-go"
assert paid_plan["paymentMethodRequired"]
def test_get_installation_returns_free_plan(self):
result = VercelIntegration.get_installation_billing_plan(self.installation_id)
assert "billingplan" in result
assert result["billingplan"]["id"] == "free"
def test_update_installation_success(self):
VercelIntegration.update_installation(self.installation_id, "pro200")
updated_installation = OrganizationIntegration.objects.get(integration_id=self.installation_id)
assert updated_installation.config["billing_plan_id"] == "free"
def test_update_installation_not_found(self):
VercelIntegration.update_installation("inst_nonexistent", "pro200")
@patch("django.conf.settings.DEBUG", True)
def test_delete_installation_dev_mode(self):
result = VercelIntegration.delete_installation(self.installation_id)
assert result["finalized"]
assert not OrganizationIntegration.objects.filter(integration_id=self.installation_id).exists()
@patch("django.conf.settings.DEBUG", False)
def test_delete_installation_prod_mode(self):
result = VercelIntegration.delete_installation(self.installation_id)
assert not result["finalized"]
assert not OrganizationIntegration.objects.filter(integration_id=self.installation_id).exists()
def test_delete_installation_not_found(self):
with self.assertRaises(NotFound):
VercelIntegration.delete_installation("inst_nonexistent")
def test_get_product_plans_posthog(self):
result = VercelIntegration.get_product_plans("posthog")
assert "plans" in result
assert len(result["plans"]) == 2
def test_get_product_plans_invalid_product(self):
with self.assertRaises(NotFound) as context:
VercelIntegration.get_product_plans("invalid_product")
assert str(context.exception) == "Product not found"
def test_upsert_installation_existing_installation(self):
original_config = self.installation.config.copy()
VercelIntegration.upsert_installation(self.installation_id, self.payload)
self.installation.refresh_from_db()
assert self.installation.config == self.payload
assert self.installation.config != original_config
@patch("ee.vercel.integration.report_user_signed_up")
def test_upsert_installation_new_user_new_org(self, mock_report):
new_installation_id = "inst_987654321"
VercelIntegration.upsert_installation(new_installation_id, self.payload)
new_installation = OrganizationIntegration.objects.get(integration_id=new_installation_id)
assert new_installation.config == self.payload
new_user = User.objects.get(email=self.payload["account"]["contact"]["email"])
assert new_user.first_name == self.payload["account"]["contact"]["name"]
assert not new_user.is_email_verified
new_org = new_installation.organization
assert new_org.name == self.payload["account"]["name"]
membership = OrganizationMembership.objects.get(user=new_user, organization=new_org)
assert membership.level == OrganizationMembership.Level.OWNER
mock_report.assert_called_once()
@patch("ee.vercel.integration.report_user_signed_up")
def test_upsert_installation_existing_user_new_org(self, mock_report):
existing_user = User.objects.create_user(
email=self.payload["account"]["contact"]["email"], password="existing", first_name="Existing"
)
new_installation_id = "inst_987654321"
VercelIntegration.upsert_installation(new_installation_id, self.payload)
new_installation = OrganizationIntegration.objects.get(integration_id=new_installation_id)
assert new_installation.created_by == existing_user
new_org = new_installation.organization
membership = OrganizationMembership.objects.get(user=existing_user, organization=new_org)
assert membership.level == OrganizationMembership.Level.OWNER
mock_report.assert_not_called()
@patch("ee.vercel.integration.capture_exception")
def test_upsert_installation_integrity_error(self, mock_capture):
with patch("posthog.models.organization.Organization.objects.create") as mock_create:
mock_create.side_effect = IntegrityError("Duplicate key")
with self.assertRaises(ValidationError) as context:
VercelIntegration.upsert_installation("inst_new", self.payload)
detail = context.exception.detail
if isinstance(detail, dict):
assert detail.get("validation_error") == "Something went wrong."
mock_capture.assert_called_once()
def test_upsert_installation_creates_org_with_fallback_name(self):
new_installation_id = "inst_987654321"
payload_without_name = self.payload.copy()
del payload_without_name["account"]["name"]
VercelIntegration.upsert_installation(new_installation_id, payload_without_name)
new_installation = OrganizationIntegration.objects.get(integration_id=new_installation_id)
assert new_installation.organization.name == f"Vercel Installation {new_installation_id}"
def test_upsert_installation_creates_user_with_fallback_name(self):
new_installation_id = "inst_987654321"
payload_without_name = self.payload.copy()
del payload_without_name["account"]["contact"]["name"]
VercelIntegration.upsert_installation(new_installation_id, payload_without_name)
new_user = User.objects.get(email=payload_without_name["account"]["contact"]["email"])
assert new_user.first_name == ""

View File

@@ -32,6 +32,8 @@ from products.llm_analytics.backend.api import LLMProxyViewSet
from products.messaging.backend.api import MessageCategoryViewSet, MessagePreferencesViewSet, MessageTemplatesViewSet
from products.user_interviews.backend.api import UserInterviewViewSet
from ee.api.vercel import vercel_installation, vercel_product
from ..heatmaps.heatmaps_api import HeatmapViewSet, LegacyHeatmapViewSet
from ..session_recordings.session_recording_api import SessionRecordingViewSet
from ..session_recordings.session_recording_playlist_api import SessionRecordingPlaylistViewSet
@@ -544,6 +546,17 @@ if EE_AVAILABLE:
r"persons", EnterprisePersonViewSet, "environment_persons", ["team_id"]
)
router.register(r"person", LegacyEnterprisePersonViewSet, "persons")
router.register(
r"vercel/v1/installations",
vercel_installation.VercelInstallationViewSet,
"vercel_installations",
)
router.register(
r"vercel/v1/products",
vercel_product.VercelProductViewSet,
"vercel_products",
)
else:
environment_insights_router, legacy_project_insights_router = register_grandfathered_environment_nested_viewset(
r"insights", InsightViewSet, "environment_insights", ["team_id"]