feat: add a blackhole ai capture endpoint (#39567)

This commit is contained in:
Paweł Ledwoń
2025-10-22 19:05:02 +02:00
committed by GitHub
parent 5d62809cd4
commit 111d3486a6
25 changed files with 3891 additions and 394 deletions

View File

@@ -0,0 +1 @@
# PostHog Ingestion Acceptance Tests

View File

@@ -0,0 +1,234 @@
"""PostHog API client for acceptance tests."""
import json
import time
import uuid
import logging
from typing import Any, Optional
import requests
from posthoganalytics import Posthog
from .utils import get_service_url
logger = logging.getLogger(__name__)
class PostHogTestClient:
"""Client for interacting with PostHog API during tests."""
def __init__(self, base_url: Optional[str] = None, personal_api_key: Optional[str] = None):
self.base_url = base_url or get_service_url()
self.session = requests.Session()
# Set personal API key for private endpoints if provided
if personal_api_key:
self.session.headers.update({"Authorization": f"Bearer {personal_api_key}"})
def create_organization(self, name: Optional[str] = None) -> dict[str, Any]:
"""Create a test organization using private API."""
org_name = name or f"test_org_{uuid.uuid4().hex[:8]}"
logger.info("Creating organization '%s'", org_name)
logger.debug("POST %s/api/organizations/", self.base_url)
response = self.session.post(f"{self.base_url}/api/organizations/", json={"name": org_name})
logger.debug("Response status: %s", response.status_code)
response.raise_for_status()
result = response.json()
logger.info("Organization created with ID: %s", result.get("id"))
return result
def create_project(self, organization_id: str, name: Optional[str] = None) -> dict[str, Any]:
"""Create a test project within an organization using private API."""
project_name = name or f"test_project_{uuid.uuid4().hex[:8]}"
logger.info("Creating project '%s' in org %s", project_name, organization_id)
logger.debug("POST %s/api/organizations/%s/projects/", self.base_url, organization_id)
response = self.session.post(
f"{self.base_url}/api/organizations/{organization_id}/projects/", json={"name": project_name}
)
logger.debug("Response status: %s", response.status_code)
response.raise_for_status()
result = response.json()
logger.info("Project created with ID: %s", result.get("id"))
# Wait for project to be available in query API
self._wait_for_project_ready(result.get("id"))
return result
def _wait_for_project_ready(self, project_id: str, timeout: int = 30) -> None:
"""Wait for project to be ready for queries."""
logger.info("Waiting for project %s to be ready for queries...", project_id)
start_time = time.time()
while time.time() - start_time < timeout:
try:
# First check if the project exists via the basic project API
project_response = self.session.get(f"{self.base_url}/api/projects/{project_id}/")
if project_response.status_code != 200:
logger.debug("Project %s not accessible via API, waiting...", project_id)
time.sleep(1)
continue
# Then try a simple HogQL query to see if the project is ready
query_response = self.session.post(
f"{self.base_url}/api/environments/{project_id}/query/",
json={"query": {"kind": "HogQLQuery", "query": "SELECT 1 LIMIT 1"}},
)
if query_response.status_code == 200:
logger.info("Project %s is ready for queries", project_id)
return
elif query_response.status_code == 404:
logger.debug("Project %s query endpoint not yet available, waiting...", project_id)
else:
logger.debug(
"Project %s query returned status %s, waiting...", project_id, query_response.status_code
)
except Exception as e:
logger.debug("Project %s readiness check failed: %s, waiting...", project_id, e)
time.sleep(1)
logger.warning("Project %s may not be fully ready after %s seconds", project_id, timeout)
def send_capture_event(self, api_key: str, event_data: dict[str, Any]) -> None:
"""Send an event using the PostHog Python client.
Uses the official PostHog Python client which handles the capture endpoint.
"""
# Extract event details
event_name = event_data.get("event", "test_event")
distinct_id = event_data.get("distinct_id", f"test_user_{uuid.uuid4().hex[:8]}")
properties = event_data.get("properties", {})
timestamp = event_data.get("timestamp")
logger.info("Creating PostHog client instance")
logger.debug("Host: %s", self.base_url)
# Create PostHog client instance with the API key
posthog_client = Posthog(api_key, host=self.base_url, debug=True)
logger.info("Sending capture event using PostHog client")
logger.debug("Event: %s", event_name)
logger.debug("Distinct ID: %s", distinct_id)
logger.debug("Properties: %s", properties)
if timestamp:
logger.debug("Timestamp: %s", timestamp)
# Send event using PostHog client instance
if timestamp:
posthog_client.capture(
distinct_id=distinct_id, event=event_name, properties=properties, timestamp=timestamp
)
else:
posthog_client.capture(distinct_id=distinct_id, event=event_name, properties=properties)
logger.info("Event sent via PostHog client")
# Flush to ensure the event is sent immediately
logger.debug("Flushing PostHog client")
posthog_client.flush()
logger.debug("PostHog client flushed")
# Shutdown the client
posthog_client.shutdown()
def query_events_hogql(
self, project_id: str, event_name: Optional[str] = None, distinct_id: Optional[str] = None, limit: int = 100
) -> list[dict[str, Any]]:
"""Query events using the HogQL query API (recommended method)."""
# Build HogQL query
conditions = []
if event_name:
conditions.append(f"event = '{event_name}'")
if distinct_id:
conditions.append(f"distinct_id = '{distinct_id}'")
where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
query = f"SELECT * FROM events {where_clause} ORDER BY timestamp DESC LIMIT {limit}"
logger.debug("Executing HogQL query: %s", query)
response = self.session.post(
f"{self.base_url}/api/environments/{project_id}/query/",
json={"refresh": "force_blocking", "query": {"kind": "HogQLQuery", "query": query}},
)
logger.debug("HogQL query response status: %s", response.status_code)
response.raise_for_status()
data = response.json()
logger.debug("HogQL query returned %s results", len(data.get("results", [])))
# Extract events from HogQL response
if data.get("results"):
# Convert HogQL results to event-like format
columns = data.get("columns", [])
results = []
for row in data["results"]:
event = {}
for i, col in enumerate(columns):
if i < len(row):
value = row[i]
# Parse JSON columns (properties is returned as JSON string)
if col == "properties" and isinstance(value, str):
try:
value = json.loads(value)
except (json.JSONDecodeError, TypeError):
pass
event[col] = value
results.append(event)
return results
return []
def wait_for_event(
self,
project_id: str,
event_name: str,
distinct_id: Optional[str] = None,
timeout: int = 30,
poll_interval: float = 5.0,
) -> Optional[dict[str, Any]]:
"""Poll for an event to appear in the query API."""
start_time = time.time()
while time.time() - start_time < timeout:
# Use HogQL query (recommended)
events = self.query_events_hogql(project_id, event_name, distinct_id, limit=10)
for event in events:
if event.get("event") == event_name:
if distinct_id is None or event.get("distinct_id") == distinct_id:
return event
time.sleep(poll_interval)
return None
def delete_project(self, project_id: str) -> None:
"""Delete a project using private API."""
logger.info("Deleting project %s", project_id)
logger.debug("DELETE %s/api/environments/%s/", self.base_url, project_id)
response = self.session.delete(f"{self.base_url}/api/environments/{project_id}/")
logger.debug("Response status: %s", response.status_code)
response.raise_for_status()
logger.info("Project deleted successfully")
def delete_organization(self, organization_id: str) -> None:
"""Delete an organization using private API."""
logger.info("Deleting organization %s", organization_id)
logger.debug("DELETE %s/api/organizations/%s/", self.base_url, organization_id)
response = self.session.delete(f"{self.base_url}/api/organizations/{organization_id}/")
logger.debug("Response status: %s", response.status_code)
response.raise_for_status()
logger.info("Organization deleted successfully")

View File

@@ -0,0 +1,79 @@
"""Pytest fixtures for acceptance tests."""
import os
import logging
import pytest
from .api_client import PostHogTestClient
logger = logging.getLogger(__name__)
@pytest.fixture(scope="class")
def test_client():
"""Create a PostHog test client instance for each test class."""
# Get configuration from environment
base_url = os.environ.get("POSTHOG_TEST_BASE_URL", "http://localhost:8010")
personal_api_key = os.environ.get("POSTHOG_PERSONAL_API_KEY")
if not personal_api_key:
pytest.skip("POSTHOG_PERSONAL_API_KEY not set - please set it to run acceptance tests")
logger.info("Creating test client with base_url=%s", base_url)
client = PostHogTestClient(base_url=base_url, personal_api_key=personal_api_key)
yield client
# Cleanup happens in individual tests
@pytest.fixture(scope="function")
def function_test_client():
"""Create a PostHog test client instance for each individual test (for auth tests)."""
# Get configuration from environment
base_url = os.environ.get("POSTHOG_TEST_BASE_URL", "http://localhost:8010")
personal_api_key = os.environ.get("POSTHOG_PERSONAL_API_KEY")
if not personal_api_key:
pytest.skip("POSTHOG_PERSONAL_API_KEY not set - please set it to run acceptance tests")
logger.info("Creating function-scoped test client with base_url=%s", base_url)
client = PostHogTestClient(base_url=base_url, personal_api_key=personal_api_key)
yield client
@pytest.fixture(scope="class")
def shared_org_project(test_client):
"""Create a shared organization and project for an entire test class."""
logger.info("Creating shared organization and project for test class")
client = test_client
org = client.create_organization()
project = client.create_project(org["id"])
# Project creation now waits for readiness internally
yield {
"org": org,
"project": project,
"client": client,
"org_id": org["id"],
"project_id": project["id"],
"api_key": project["api_token"],
}
# Cleanup after all tests in class are done
logger.info("Cleaning up shared organization and project")
try:
client.delete_project(project["id"])
logger.info("Successfully deleted project: %s", project["id"])
except Exception as e:
logger.warning("Failed to delete project %s: %s", project["id"], e)
try:
client.delete_organization(org["id"])
logger.info("Successfully deleted organization: %s", org["id"])
except Exception as e:
logger.warning("Failed to delete organization %s: %s", org["id"], e)

View File

@@ -0,0 +1,85 @@
# LLMA Acceptance Test Implementation Plan
## Overview
Python-based acceptance tests for the LLM Analytics capture pipeline, testing against an existing PostHog instance.
## Test Execution
### Local Development
- User starts PostHog stack manually (using docker-compose or other method)
- Set POSTHOG_TEST_BASE_URL environment variable to PostHog instance URL
- Run acceptance tests: `python run_tests.py`
### GitHub Actions
- Triggered on PRs affecting rust/capture/ or plugin-server/
- Sets up PostHog stack in CI
- Runs acceptance tests against the stack
- Collects logs on failure
## Implementation Steps
### Commit 1: Basic acceptance test infrastructure ✓
1. Create common/ingestion/acceptance_tests/ directory structure
2. Add utils.py for service URL management
3. Add requirements.txt with test dependencies
4. Add run_tests.py test runner script
### Commit 2: PostHog API client for test setup
1. Add api_client.py with PostHogTestClient class
2. Implement organization creation via API
3. Implement project creation with API key retrieval
4. Add project deletion for cleanup
5. Add event querying through PostHog Query API
6. Implement polling mechanism for event arrival
### Commit 3: Basic event capture test
1. Add test_basic_capture.py
2. Test sending a regular PostHog event to /capture
3. Poll Query API until event appears
4. Verify event properties match what was sent
5. Confirm end-to-end pipeline works
### Commit 4: Test orchestration
1. Add conftest.py with pytest fixtures
2. Add per-test project isolation
3. Add environment variable handling
4. Handle cleanup of test data
### Commit 5: GitHub Actions workflow
1. Add .github/workflows/llma-acceptance-tests.yml
2. Configure triggers for relevant paths
3. Add PostHog stack startup steps
4. Add test execution with pytest
5. Implement log collection on failure
6. Add cleanup steps
### Commit 6: Documentation
1. Add README.md with setup instructions
2. Document local running procedures
3. Document environment variables
4. Add troubleshooting guide
5. Create .env.example file
6. Document how to extend the test suite
## Usage
```bash
# Start PostHog manually (e.g., with docker-compose)
docker-compose -f docker-compose.dev-full.yml up -d
# Set environment variable
export POSTHOG_TEST_BASE_URL=http://localhost:8010
# Run tests
cd common/ingestion/acceptance_tests
python run_tests.py
```

View File

@@ -0,0 +1,8 @@
pytest==7.4.3
pytest-asyncio==0.21.1
requests==2.31.0
requests-toolbelt==1.0.0
boto3==1.29.7
docker==6.1.3
python-multipart==0.0.6
posthog==6.7.6

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""Main test runner for PostHog acceptance tests."""
import sys
import logging
import subprocess
from pathlib import Path
# Configure logging for the test runner
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S")
logger = logging.getLogger(__name__)
def run_tests():
"""Run the acceptance test suite."""
test_dir = Path(__file__).parent
# Use virtual environment if it exists
venv_python = test_dir / ".venv" / "bin" / "python"
if venv_python.exists():
python_cmd = str(venv_python)
else:
python_cmd = sys.executable
# Run pytest on all test files
# -v: verbose
# -s: no capture, show print statements
# --tb=short: short traceback format
# --log-cli-level=DEBUG: show debug logs in console
# --log-cli-format: format for console logs
result = subprocess.run(
[
python_cmd,
"-m",
"pytest",
"-v",
"-s",
"--tb=short",
"--log-cli-level=DEBUG",
"--log-cli-format=%(asctime)s [%(levelname)s] %(name)s: %(message)s",
"--log-cli-date-format=%H:%M:%S",
"--numprocesses=auto",
],
cwd=test_dir,
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.returncode
def main():
"""Main entry point for the test runner."""
logger.info("=" * 60)
logger.info("PostHog Acceptance Tests")
logger.info("=" * 60)
# Log environment info
import os
logger.info("Environment:")
logger.info(
" POSTHOG_TEST_BASE_URL: %s", os.environ.get("POSTHOG_TEST_BASE_URL", "http://localhost:8010 (default)")
)
logger.info(" POSTHOG_PERSONAL_API_KEY: %s", "SET" if os.environ.get("POSTHOG_PERSONAL_API_KEY") else "NOT SET")
exit_code = 1
try:
# Run the tests
logger.info("Running acceptance tests...")
logger.info("-" * 60)
exit_code = run_tests()
logger.info("-" * 60)
if exit_code == 0:
logger.info("✓ All tests passed!")
else:
logger.error("✗ Tests failed with exit code %s", exit_code)
except KeyboardInterrupt:
logger.info("Interrupted by user")
exit_code = 130
except Exception as e:
logger.exception("✗ Error: %s", e)
exit_code = 1
logger.info("Done.")
return exit_code
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,112 @@
"""Basic capture test - creates project, sends event, and verifies it."""
import uuid
import logging
logger = logging.getLogger(__name__)
class TestBasicCapture:
"""Test basic event capture flow."""
def test_capture_and_query_event(self, test_client):
"""Test that we can capture an event and query it back."""
logger.info("\n" + "=" * 60)
logger.info("STARTING TEST: Basic Event Capture")
logger.info("=" * 60)
client = test_client
# Create organization and project
org = None
project = None
try:
# Step 1: Create test organization
logger.info("Step 1: Creating test organization")
org = client.create_organization()
org_id = org["id"]
logger.info("Organization created: %s", org_id)
# Step 2: Create test project
logger.info("Step 2: Creating test project")
project = client.create_project(org_id)
project_id = project["id"]
project_api_key = project["api_token"]
logger.info("Project created: %s", project_id)
# Step 3: Prepare test event
logger.info("Step 3: Preparing test event")
event_name = f"test_event_{uuid.uuid4().hex[:8]}"
distinct_id = f"test_user_{uuid.uuid4().hex[:8]}"
test_properties = {"test_property": "test_value", "test_number": 42, "test_bool": True}
logger.debug("Event name: %s", event_name)
logger.debug("Distinct ID: %s", distinct_id)
logger.debug("Properties: %s", test_properties)
# Step 4: Send capture event
logger.info("Step 4: Sending event to capture endpoint")
client.send_capture_event(
api_key=project_api_key,
event_data={"event": event_name, "distinct_id": distinct_id, "properties": test_properties},
)
logger.info("Event sent successfully")
# Step 5: Wait for event to appear in query API
logger.info("Step 5: Waiting for event to be processed")
event = client.wait_for_event(
project_id=project_id, event_name=event_name, distinct_id=distinct_id, timeout=30
)
# Verify event was found
assert event is not None, f"Event {event_name} not found after 30 seconds"
logger.info("Event found in query API")
logger.debug("Retrieved event: %s", event)
# Step 6: Verify event properties
logger.info("Step 6: Verifying event properties")
assert (
event.get("event") == event_name
), f"Event name mismatch: expected {event_name}, got {event.get('event')}"
logger.debug("Event name matches: %s", event_name)
assert (
event.get("distinct_id") == distinct_id
), f"Distinct ID mismatch: expected {distinct_id}, got {event.get('distinct_id')}"
logger.debug("Distinct ID matches: %s", distinct_id)
# Check if properties match
event_properties = event.get("properties", {})
logger.debug("Event properties: %s", event_properties)
for key, value in test_properties.items():
assert key in event_properties, f"Property {key} not found in event"
assert (
event_properties[key] == value
), f"Property {key} value mismatch: expected {value}, got {event_properties[key]}"
logger.debug("Property %s = %s", key, value)
logger.info("All event properties verified successfully")
logger.info("Test completed successfully")
logger.info("=" * 60)
finally:
# Cleanup
logger.info("Step 7: Cleaning up test resources")
if project:
try:
client.delete_project(project["id"])
logger.info("Cleaned up project: %s", project["id"])
except Exception as e:
logger.exception("Failed to delete project: %s", e)
if org:
try:
client.delete_organization(org["id"])
logger.info("Cleaned up organization: %s", org["id"])
except Exception as e:
logger.exception("Failed to delete organization: %s", e)
logger.info("\n" + "=" * 60)

View File

@@ -0,0 +1,812 @@
"""LLM Analytics capture tests - tests multipart blob upload and S3 storage."""
import gzip
import json
import uuid
import logging
import pytest
import requests
from requests_toolbelt import MultipartEncoder
logger = logging.getLogger(__name__)
def assert_part_details(part, expected_name, expected_length, expected_content_type, expected_content_encoding=None):
"""Assert comprehensive details about a multipart part."""
assert part["name"] == expected_name, f"Expected part name '{expected_name}', got '{part['name']}'"
assert part["length"] == expected_length, f"Expected part length {expected_length}, got {part['length']}"
assert (
part["content-type"] == expected_content_type
), f"Expected content-type '{expected_content_type}', got '{part['content-type']}'"
assert (
part["content-encoding"] == expected_content_encoding
), f"Expected content-encoding '{expected_content_encoding}', got '{part['content-encoding']}'"
def assert_parts_order_and_details(response_data, expected_parts):
"""Assert that parts are in the correct order and have correct details."""
assert "accepted_parts" in response_data, "Response should contain accepted_parts"
assert isinstance(response_data["accepted_parts"], list), "accepted_parts should be a list"
actual_parts = response_data["accepted_parts"]
assert len(actual_parts) == len(expected_parts), f"Expected {len(expected_parts)} parts, got {len(actual_parts)}"
for i, (actual_part, expected_part) in enumerate(zip(actual_parts, expected_parts)):
expected_name, expected_length, expected_content_type, expected_content_encoding = expected_part
assert_part_details(
actual_part, expected_name, expected_length, expected_content_type, expected_content_encoding
)
logger.debug(f"Part {i}: {actual_part['name']} - {actual_part['length']} bytes - {actual_part['content-type']}")
@pytest.mark.usefixtures("shared_org_project")
class TestLLMAnalytics:
"""Test LLM Analytics capture flow with multipart requests and S3 storage."""
# ============================================================================
# PHASE 1: HTTP ENDPOINT
# ============================================================================
# ----------------------------------------------------------------------------
# Scenario 1.1: Event Processing Verification
# ----------------------------------------------------------------------------
def test_basic_ai_generation_event(self, shared_org_project):
"""Test that we can capture an $ai_generation event with blob data via multipart request."""
logger.info("\n" + "=" * 60)
logger.info("STARTING TEST: Basic $ai_generation Event Capture")
logger.info("=" * 60)
client = shared_org_project["client"]
project_id = shared_org_project["project_id"]
project_api_key = shared_org_project["api_key"]
# Step 1: Using shared organization and project
logger.info("Step 1: Using shared organization and project")
# Step 2: Prepare test event data
logger.info("Step 2: Preparing $ai_generation event")
distinct_id = f"test_user_{uuid.uuid4().hex[:8]}"
event_data = {
"event": "$ai_generation",
"distinct_id": distinct_id,
"timestamp": "2024-01-15T10:30:00Z",
"properties": {
"$ai_model": "gpt-4",
"$ai_provider": "openai",
"$ai_completion_tokens": 150,
"$ai_prompt_tokens": 50,
"custom_property": "test_value",
},
}
# Prepare blob data
input_blob = {
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"},
],
"temperature": 0.7,
"max_tokens": 200,
}
output_blob = {
"choices": [
{
"message": {"role": "assistant", "content": "The capital of France is Paris."},
"finish_reason": "stop",
"index": 0,
}
],
"model": "gpt-4",
"usage": {"prompt_tokens": 50, "completion_tokens": 150, "total_tokens": 200},
}
# Step 3: Create multipart request
logger.info("Step 3: Creating multipart request")
# Create multipart encoder with proper boundary
boundary = f"----WebKitFormBoundary{uuid.uuid4().hex[:16]}"
fields = {
"event": ("event", json.dumps(event_data), "application/json"),
"event.properties.$ai_input": (
f"blob_{uuid.uuid4().hex[:8]}",
json.dumps(input_blob),
"application/json",
),
"event.properties.$ai_output_choices": (
f"blob_{uuid.uuid4().hex[:8]}",
json.dumps(output_blob),
"application/json",
),
}
multipart_data = MultipartEncoder(fields=fields, boundary=boundary)
# Step 4: Send multipart request to /i/v0/ai endpoint
logger.info("Step 4: Sending multipart request to /i/v0/ai endpoint")
capture_url = f"{client.base_url}/i/v0/ai"
headers = {"Content-Type": multipart_data.content_type, "Authorization": f"Bearer {project_api_key}"}
logger.debug("POST %s", capture_url)
logger.debug("Content-Type: %s", headers["Content-Type"])
response = requests.post(capture_url, data=multipart_data, headers=headers)
logger.debug("Response status: %s", response.status_code)
logger.debug("Response body: %s", response.text)
# Check if request was successful
response.raise_for_status()
logger.info("Multipart request sent successfully")
# Verify response contains accepted parts
response_data = response.json()
assert "accepted_parts" in response_data
accepted_parts = response_data["accepted_parts"]
assert len(accepted_parts) == 3, f"Expected 3 parts, got {len(accepted_parts)}"
# Verify each part has correct details
event_json = json.dumps(event_data)
input_json = json.dumps(input_blob)
output_json = json.dumps(output_blob)
assert accepted_parts[0]["name"] == "event"
assert accepted_parts[0]["length"] == len(event_json)
assert accepted_parts[0]["content-type"] == "application/json"
assert accepted_parts[1]["name"] == "event.properties.$ai_input"
assert accepted_parts[1]["length"] == len(input_json)
assert accepted_parts[1]["content-type"] == "application/json"
assert accepted_parts[2]["name"] == "event.properties.$ai_output_choices"
assert accepted_parts[2]["length"] == len(output_json)
assert accepted_parts[2]["content-type"] == "application/json"
logger.info("Response validation successful: all parts accepted with correct lengths")
# Step 5: Wait for event to appear in query API
logger.info("Step 5: Waiting for event to be processed")
event = client.wait_for_event(
project_id=project_id, event_name="$ai_generation", distinct_id=distinct_id, timeout=30
)
# Verify event was found
assert event is not None, "$ai_generation event not found after 30 seconds"
logger.info("Event found in query API")
logger.debug("Retrieved event: %s", event)
# Step 6: Verify event properties
logger.info("Step 6: Verifying event properties")
assert event.get("event") == "$ai_generation"
assert event.get("distinct_id") == distinct_id
event_properties = event.get("properties", {})
# Verify standard properties
assert event_properties.get("$ai_model") == "gpt-4"
assert event_properties.get("$ai_provider") == "openai"
assert event_properties.get("$ai_completion_tokens") == 150
assert event_properties.get("$ai_prompt_tokens") == 50
assert event_properties.get("custom_property") == "test_value"
# Verify blob properties were replaced with S3 URLs
assert "$ai_input" in event_properties, "$ai_input property not found"
assert "$ai_output_choices" in event_properties, "$ai_output_choices property not found"
ai_input_url = event_properties["$ai_input"]
ai_output_url = event_properties["$ai_output_choices"]
logger.debug("$ai_input URL: %s", ai_input_url)
logger.debug("$ai_output_choices URL: %s", ai_output_url)
# Verify URLs are S3 URLs with range parameters
assert ai_input_url.startswith("s3://"), "$ai_input should be an S3 URL"
assert ai_output_url.startswith("s3://"), "$ai_output_choices should be an S3 URL"
assert "range=" in ai_input_url, "$ai_input URL should contain range parameter"
assert "range=" in ai_output_url, "$ai_output_choices URL should contain range parameter"
# Verify URLs point to same multipart file but different ranges
input_base = ai_input_url.split("?")[0]
output_base = ai_output_url.split("?")[0]
assert input_base == output_base, "Both URLs should point to same multipart file"
logger.info("All event properties verified successfully")
logger.info("S3 URLs generated correctly with byte ranges")
logger.info("Test completed successfully")
logger.info("=" * 60)
def test_ai_generation_event_with_separate_properties(self, shared_org_project):
"""Test $ai_generation event with properties in a separate multipart part."""
logger.info("\n" + "=" * 60)
logger.info("STARTING TEST: $ai_generation Event with Separate Properties")
logger.info("=" * 60)
client = shared_org_project["client"]
project_id = shared_org_project["project_id"]
project_api_key = shared_org_project["api_key"]
logger.info("Step 1: Using shared organization and project")
logger.info("Step 2: Preparing $ai_generation event with separate properties")
distinct_id = f"test_user_{uuid.uuid4().hex[:8]}"
event_data = {
"event": "$ai_generation",
"distinct_id": distinct_id,
"timestamp": "2024-01-15T10:30:00Z",
}
properties_data = {
"$ai_model": "gpt-3.5-turbo",
"$ai_provider": "openai",
"$ai_completion_tokens": 100,
"$ai_prompt_tokens": 25,
"custom_property": "separate_test",
}
input_blob = {
"messages": [
{"role": "user", "content": "Tell me a joke."},
],
"temperature": 0.9,
}
output_blob = {
"choices": [
{
"message": {
"role": "assistant",
"content": "Why did the chicken cross the road? To get to the other side!",
},
"finish_reason": "stop",
}
],
}
logger.info("Step 3: Creating multipart request with separate properties part")
boundary = f"----WebKitFormBoundary{uuid.uuid4().hex[:16]}"
fields = {
"event": ("event", json.dumps(event_data), "application/json"),
"event.properties": ("event.properties", json.dumps(properties_data), "application/json"),
"event.properties.$ai_input": (
f"blob_{uuid.uuid4().hex[:8]}",
json.dumps(input_blob),
"application/json",
),
"event.properties.$ai_output_choices": (
f"blob_{uuid.uuid4().hex[:8]}",
json.dumps(output_blob),
"application/json",
),
}
multipart_data = MultipartEncoder(fields=fields, boundary=boundary)
logger.info("Step 4: Sending multipart request to /i/v0/ai endpoint")
capture_url = f"{client.base_url}/i/v0/ai"
headers = {"Content-Type": multipart_data.content_type, "Authorization": f"Bearer {project_api_key}"}
response = requests.post(capture_url, data=multipart_data, headers=headers)
response.raise_for_status()
logger.info("Multipart request sent successfully")
# Verify response contains accepted parts
response_data = response.json()
assert "accepted_parts" in response_data
accepted_parts = response_data["accepted_parts"]
assert len(accepted_parts) == 4, f"Expected 4 parts, got {len(accepted_parts)}"
# Verify each part has correct details
event_json = json.dumps(event_data)
properties_json = json.dumps(properties_data)
input_json = json.dumps(input_blob)
output_json = json.dumps(output_blob)
assert accepted_parts[0]["name"] == "event"
assert accepted_parts[0]["length"] == len(event_json)
assert accepted_parts[0]["content-type"] == "application/json"
assert accepted_parts[1]["name"] == "event.properties"
assert accepted_parts[1]["length"] == len(properties_json)
assert accepted_parts[1]["content-type"] == "application/json"
assert accepted_parts[2]["name"] == "event.properties.$ai_input"
assert accepted_parts[2]["length"] == len(input_json)
assert accepted_parts[2]["content-type"] == "application/json"
assert accepted_parts[3]["name"] == "event.properties.$ai_output_choices"
assert accepted_parts[3]["length"] == len(output_json)
assert accepted_parts[3]["content-type"] == "application/json"
logger.info("Response validation successful: all parts accepted with correct lengths")
logger.info("Step 5: Waiting for event to be processed")
event = client.wait_for_event(
project_id=project_id, event_name="$ai_generation", distinct_id=distinct_id, timeout=30
)
assert event is not None, "$ai_generation event not found after 30 seconds"
logger.info("Event found in query API")
logger.info("Step 6: Verifying event properties")
assert event.get("event") == "$ai_generation"
assert event.get("distinct_id") == distinct_id
event_properties = event.get("properties", {})
assert event_properties.get("$ai_model") == "gpt-3.5-turbo"
assert event_properties.get("$ai_provider") == "openai"
assert event_properties.get("$ai_completion_tokens") == 100
assert event_properties.get("$ai_prompt_tokens") == 25
assert event_properties.get("custom_property") == "separate_test"
assert "$ai_input" in event_properties
assert "$ai_output_choices" in event_properties
ai_input_url = event_properties["$ai_input"]
ai_output_url = event_properties["$ai_output_choices"]
assert ai_input_url.startswith("s3://")
assert ai_output_url.startswith("s3://")
assert "range=" in ai_input_url
assert "range=" in ai_output_url
input_base = ai_input_url.split("?")[0]
output_base = ai_output_url.split("?")[0]
assert input_base == output_base
logger.info("All event properties verified successfully")
logger.info("Separate properties part handled correctly")
logger.info("Test completed successfully")
logger.info("=" * 60)
def test_all_accepted_ai_event_types(self, shared_org_project):
"""Test that all six accepted AI event types are successfully captured and stored."""
client = shared_org_project["client"]
project_id = shared_org_project["project_id"]
api_key = shared_org_project["api_key"]
base_distinct_id = f"user_{uuid.uuid4()}"
# Define all event types with their specific properties
events_to_test = [
{
"event_type": "$ai_generation",
"distinct_id": f"{base_distinct_id}_generation",
"properties": {
"$ai_model": "test-model",
"$ai_provider": "test-provider",
"$ai_input_tokens": 100,
"$ai_output_tokens": 50,
},
},
{
"event_type": "$ai_trace",
"distinct_id": f"{base_distinct_id}_trace",
"properties": {
"$ai_model": "test-model",
"$ai_provider": "test-provider",
"$ai_trace_id": str(uuid.uuid4()),
},
},
{
"event_type": "$ai_span",
"distinct_id": f"{base_distinct_id}_span",
"properties": {
"$ai_model": "test-model",
"$ai_provider": "test-provider",
"$ai_trace_id": str(uuid.uuid4()),
"$ai_span_id": str(uuid.uuid4()),
},
},
{
"event_type": "$ai_embedding",
"distinct_id": f"{base_distinct_id}_embedding",
"properties": {
"$ai_model": "test-model",
"$ai_provider": "test-provider",
"$ai_input_tokens": 75,
},
},
{
"event_type": "$ai_metric",
"distinct_id": f"{base_distinct_id}_metric",
"properties": {
"$ai_model": "test-model",
"$ai_provider": "test-provider",
"$ai_metric_type": "latency",
"$ai_metric_value": 1.23,
},
},
{
"event_type": "$ai_feedback",
"distinct_id": f"{base_distinct_id}_feedback",
"properties": {
"$ai_model": "test-model",
"$ai_provider": "test-provider",
"$ai_feedback_score": 5,
"$ai_feedback_comment": "Great response",
},
},
]
# Send all events
for event_spec in events_to_test:
event_type = event_spec["event_type"]
distinct_id = event_spec["distinct_id"]
logger.info(f"Sending {event_type} event")
event_data = {
"event": event_type,
"distinct_id": distinct_id,
"$set": {"test_user": True, "event_type_test": event_type},
}
fields = {
"event": ("event", json.dumps(event_data), "application/json"),
"event.properties": ("event.properties", json.dumps(event_spec["properties"]), "application/json"),
}
multipart_data = MultipartEncoder(fields=fields)
response = requests.post(
f"{client.base_url}/i/v0/ai",
data=multipart_data,
headers={"Content-Type": multipart_data.content_type, "Authorization": f"Bearer {api_key}"},
)
assert (
response.status_code == 200
), f"Expected 200 for {event_type}, got {response.status_code}: {response.text}"
response_data = response.json()
assert len(response_data["accepted_parts"]) == 2
logger.info(f"{event_type} event sent successfully")
logger.info("All event types sent successfully, now querying to verify storage")
# Query and verify all events
for event_spec in events_to_test:
event_type = event_spec["event_type"]
distinct_id = event_spec["distinct_id"]
logger.info(f"Querying {event_type} event with distinct_id {distinct_id}")
event = client.wait_for_event(project_id, event_type, distinct_id)
assert event is not None, f"Event {event_type} not found"
assert event["event"] == event_type
assert event["distinct_id"] == distinct_id
assert event["properties"]["$ai_model"] == "test-model"
assert event["properties"]["$ai_provider"] == "test-provider"
logger.info(f"{event_type} event verified successfully")
logger.info("All six AI event types verified successfully")
# ============================================================================
# PHASE 4: MULTIPART FILE PROCESSING
# ============================================================================
# ----------------------------------------------------------------------------
# Scenario 4.3: Content Type Handling
# ----------------------------------------------------------------------------
def test_ai_generation_event_with_different_content_types(self, shared_org_project):
"""Test sending events with blobs using different supported content types."""
client = shared_org_project["client"]
project_id = shared_org_project["project_id"]
api_key = shared_org_project["api_key"]
base_distinct_id = f"user_{uuid.uuid4()}"
# Send Event 1: application/json blob
logger.info("Sending event with application/json blob")
distinct_id_json = f"{base_distinct_id}_json"
event_data_json = {
"event": "$ai_generation",
"distinct_id": distinct_id_json,
"$set": {"test_user": True, "content_type_test": "json"},
}
properties_data_json = {
"$ai_model": "gpt-4",
"$ai_model_parameters": {"temperature": 0.7},
}
json_blob_data = {"context": "This is JSON formatted LLM input", "tokens": 150}
fields_json = {
"event": ("event", json.dumps(event_data_json), "application/json"),
"event.properties": ("event.properties", json.dumps(properties_data_json), "application/json"),
"event.properties.$ai_input": ("blob_json", json.dumps(json_blob_data), "application/json"),
}
multipart_data_json = MultipartEncoder(fields=fields_json)
response_json = requests.post(
f"{client.base_url}/i/v0/ai",
data=multipart_data_json,
headers={"Content-Type": multipart_data_json.content_type, "Authorization": f"Bearer {api_key}"},
)
assert response_json.status_code == 200, f"Expected 200, got {response_json.status_code}: {response_json.text}"
response_data_json = response_json.json()
assert len(response_data_json["accepted_parts"]) == 3
parts_by_name_json = {part["name"]: part for part in response_data_json["accepted_parts"]}
assert parts_by_name_json["event.properties.$ai_input"]["content-type"] == "application/json"
# Send Event 2: text/plain blob
logger.info("Sending event with text/plain blob")
distinct_id_text = f"{base_distinct_id}_text"
event_data_text = {
"event": "$ai_generation",
"distinct_id": distinct_id_text,
"$set": {"test_user": True, "content_type_test": "text"},
}
properties_data_text = {
"$ai_model": "gpt-4",
"$ai_model_parameters": {"temperature": 0.5},
}
text_blob_data = "This is plain text LLM output with multiple lines.\nSecond line here.\nThird line."
fields_text = {
"event": ("event", json.dumps(event_data_text), "application/json"),
"event.properties": ("event.properties", json.dumps(properties_data_text), "application/json"),
"event.properties.$ai_output": ("blob_text", text_blob_data, "text/plain"),
}
multipart_data_text = MultipartEncoder(fields=fields_text)
response_text = requests.post(
f"{client.base_url}/i/v0/ai",
data=multipart_data_text,
headers={"Content-Type": multipart_data_text.content_type, "Authorization": f"Bearer {api_key}"},
)
assert response_text.status_code == 200, f"Expected 200, got {response_text.status_code}: {response_text.text}"
response_data_text = response_text.json()
assert len(response_data_text["accepted_parts"]) == 3
parts_by_name_text = {part["name"]: part for part in response_data_text["accepted_parts"]}
assert parts_by_name_text["event.properties.$ai_output"]["content-type"] == "text/plain"
# Send Event 3: application/octet-stream blob
logger.info("Sending event with application/octet-stream blob")
distinct_id_binary = f"{base_distinct_id}_binary"
event_data_binary = {
"event": "$ai_generation",
"distinct_id": distinct_id_binary,
"$set": {"test_user": True, "content_type_test": "binary"},
}
properties_data_binary = {
"$ai_model": "gpt-4",
"$ai_model_parameters": {"temperature": 0.9},
}
binary_blob_data = bytes([0x00, 0x01, 0x02, 0x03, 0x04, 0xFF, 0xFE, 0xFD])
fields_binary = {
"event": ("event", json.dumps(event_data_binary), "application/json"),
"event.properties": ("event.properties", json.dumps(properties_data_binary), "application/json"),
"event.properties.$ai_embedding_vector": ("blob_binary", binary_blob_data, "application/octet-stream"),
}
multipart_data_binary = MultipartEncoder(fields=fields_binary)
response_binary = requests.post(
f"{client.base_url}/i/v0/ai",
data=multipart_data_binary,
headers={"Content-Type": multipart_data_binary.content_type, "Authorization": f"Bearer {api_key}"},
)
assert (
response_binary.status_code == 200
), f"Expected 200, got {response_binary.status_code}: {response_binary.text}"
response_data_binary = response_binary.json()
assert len(response_data_binary["accepted_parts"]) == 3
parts_by_name_binary = {part["name"]: part for part in response_data_binary["accepted_parts"]}
assert (
parts_by_name_binary["event.properties.$ai_embedding_vector"]["content-type"] == "application/octet-stream"
)
logger.info("All three events sent successfully, now querying to verify storage")
# Query and verify Event 1 (JSON blob)
event_json = client.wait_for_event(project_id, "$ai_generation", distinct_id_json)
assert event_json is not None, "Event with JSON blob not found"
assert event_json["properties"]["$ai_model"] == "gpt-4"
logger.info(f"Event 1 (JSON blob) verified: {distinct_id_json}")
# Query and verify Event 2 (text blob)
event_text = client.wait_for_event(project_id, "$ai_generation", distinct_id_text)
assert event_text is not None, "Event with text blob not found"
assert event_text["properties"]["$ai_model"] == "gpt-4"
logger.info(f"Event 2 (text blob) verified: {distinct_id_text}")
# Query and verify Event 3 (binary blob)
event_binary = client.wait_for_event(project_id, "$ai_generation", distinct_id_binary)
assert event_binary is not None, "Event with binary blob not found"
assert event_binary["properties"]["$ai_model"] == "gpt-4"
logger.info(f"Event 3 (binary blob) verified: {distinct_id_binary}")
logger.info("All three content type events verified successfully")
# TODO: Verify blob properties have S3 URLs once S3 upload is implemented
# ============================================================================
# PHASE 5: AUTHORIZATION
# ============================================================================
# ----------------------------------------------------------------------------
# Scenario 5.1: API Key Authentication
# ----------------------------------------------------------------------------
def test_ai_endpoint_invalid_auth_returns_401(self, function_test_client):
"""Test that requests with invalid API key return 401 Unauthorized."""
client = function_test_client
event_data = {
"event": "$ai_generation",
"distinct_id": f"test_user_{uuid.uuid4().hex[:8]}",
}
properties_data = {"$ai_model": "test"}
fields = {
"event": ("event", json.dumps(event_data), "application/json"),
"event.properties": ("event.properties", json.dumps(properties_data), "application/json"),
}
multipart_data = MultipartEncoder(fields=fields)
response = requests.post(
f"{client.base_url}/i/v0/ai",
data=multipart_data,
headers={"Content-Type": multipart_data.content_type, "Authorization": "Bearer invalid_key_123"},
)
assert response.status_code == 401, f"Expected 401, got {response.status_code}"
# ============================================================================
# PHASE 7: COMPRESSION
# ============================================================================
# ----------------------------------------------------------------------------
# Scenario 7.1: Mixed Compression
# ----------------------------------------------------------------------------
def test_ai_generation_event_with_gzip_compression(self, shared_org_project):
"""Test $ai_generation event with gzip compression for the entire request."""
logger.info("\n" + "=" * 60)
logger.info("STARTING TEST: $ai_generation Event with Gzip Compression")
logger.info("=" * 60)
client = shared_org_project["client"]
project_id = shared_org_project["project_id"]
project_api_key = shared_org_project["api_key"]
logger.info("Step 1: Using shared organization and project")
logger.info("Step 2: Preparing $ai_generation event")
distinct_id = f"test_user_{uuid.uuid4().hex[:8]}"
event_data = {
"event": "$ai_generation",
"distinct_id": distinct_id,
"timestamp": "2024-01-15T10:30:00Z",
"properties": {
"$ai_model": "gpt-4-compressed",
"$ai_provider": "openai",
"$ai_completion_tokens": 75,
"$ai_prompt_tokens": 30,
"compression": "gzip",
},
}
input_blob = {
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain compression in simple terms."},
],
}
output_blob = {
"choices": [
{
"message": {
"role": "assistant",
"content": "Compression reduces data size by encoding information more efficiently.",
},
"finish_reason": "stop",
}
],
}
logger.info("Step 3: Creating multipart request")
boundary = f"----WebKitFormBoundary{uuid.uuid4().hex[:16]}"
fields = {
"event": ("event", json.dumps(event_data), "application/json"),
"event.properties.$ai_input": (
f"blob_{uuid.uuid4().hex[:8]}",
json.dumps(input_blob),
"application/json",
),
"event.properties.$ai_output_choices": (
f"blob_{uuid.uuid4().hex[:8]}",
json.dumps(output_blob),
"application/json",
),
}
multipart_data = MultipartEncoder(fields=fields, boundary=boundary)
logger.info("Step 4: Compressing request body with gzip")
uncompressed_body = multipart_data.to_string()
compressed_body = gzip.compress(uncompressed_body)
logger.debug("Uncompressed size: %d bytes", len(uncompressed_body))
logger.debug("Compressed size: %d bytes", len(compressed_body))
logger.debug("Compression ratio: %.2f%%", (1 - len(compressed_body) / len(uncompressed_body)) * 100)
logger.info("Step 5: Sending compressed multipart request to /i/v0/ai endpoint")
capture_url = f"{client.base_url}/i/v0/ai"
headers = {
"Content-Type": multipart_data.content_type,
"Content-Encoding": "gzip",
"Authorization": f"Bearer {project_api_key}",
}
response = requests.post(capture_url, data=compressed_body, headers=headers)
response.raise_for_status()
logger.info("Compressed multipart request sent successfully")
# Verify response contains accepted parts
response_data = response.json()
assert "accepted_parts" in response_data
accepted_parts = response_data["accepted_parts"]
assert len(accepted_parts) == 3, f"Expected 3 parts, got {len(accepted_parts)}"
# Verify each part has correct details (lengths should match uncompressed data)
event_json = json.dumps(event_data)
input_json = json.dumps(input_blob)
output_json = json.dumps(output_blob)
assert accepted_parts[0]["name"] == "event"
assert accepted_parts[0]["length"] == len(event_json)
assert accepted_parts[0]["content-type"] == "application/json"
assert accepted_parts[1]["name"] == "event.properties.$ai_input"
assert accepted_parts[1]["length"] == len(input_json)
assert accepted_parts[1]["content-type"] == "application/json"
assert accepted_parts[2]["name"] == "event.properties.$ai_output_choices"
assert accepted_parts[2]["length"] == len(output_json)
assert accepted_parts[2]["content-type"] == "application/json"
logger.info("Response validation successful: decompressed parts have correct lengths")
logger.info("Step 6: Waiting for event to be processed")
event = client.wait_for_event(
project_id=project_id, event_name="$ai_generation", distinct_id=distinct_id, timeout=30
)
assert event is not None, "$ai_generation event not found after 30 seconds"
logger.info("Event found in query API")
logger.info("Step 7: Verifying event properties")
assert event.get("event") == "$ai_generation"
assert event.get("distinct_id") == distinct_id
event_properties = event.get("properties", {})
assert event_properties.get("$ai_model") == "gpt-4-compressed"
assert event_properties.get("$ai_provider") == "openai"
assert event_properties.get("compression") == "gzip"
assert "$ai_input" in event_properties
assert "$ai_output_choices" in event_properties
ai_input_url = event_properties["$ai_input"]
ai_output_url = event_properties["$ai_output_choices"]
assert ai_input_url.startswith("s3://")
assert ai_output_url.startswith("s3://")
assert "range=" in ai_input_url
assert "range=" in ai_output_url
logger.info("All event properties verified successfully")
logger.info("Gzip compression handled correctly")
logger.info("Test completed successfully")
logger.info("=" * 60)

View File

@@ -0,0 +1,17 @@
"""Utilities for acceptance tests."""
import os
def get_service_url(service: str = "proxy") -> str:
"""Get the URL for a service."""
if base_url := os.environ.get("POSTHOG_TEST_BASE_URL"):
return base_url
service_urls = {
"proxy": "http://localhost:8010",
"s3": "http://localhost:19000",
"clickhouse": "http://localhost:8123",
}
return service_urls.get(service, "http://localhost:8010")

View File

@@ -36,6 +36,8 @@ services:
path /batch*
path /capture
path /capture*
path /ai
path /ai/*
}
@flags {

19
rust/Cargo.lock generated
View File

@@ -1528,6 +1528,7 @@ dependencies = [
"lz-str",
"metrics",
"metrics-exporter-prometheus",
"multer",
"once_cell",
"opentelemetry 0.22.0",
"opentelemetry-otlp",
@@ -4850,6 +4851,23 @@ dependencies = [
"uuid",
]
[[package]]
name = "multer"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 1.1.0",
"httparse",
"memchr",
"mime",
"spin 0.9.8",
"version_check",
]
[[package]]
name = "native-tls"
version = "0.2.11"
@@ -6406,6 +6424,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"native-tls",
"once_cell",
"percent-encoding",

View File

@@ -52,6 +52,8 @@ jiff = "0.1"
dateparser = "0.2"
lz-str = { workspace = true }
regex = { workspace = true }
multer = "3.0"
futures = { workspace = true }
[dev-dependencies]
assert-json-diff = { workspace = true }
@@ -61,5 +63,5 @@ futures = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
rdkafka = { workspace = true }
reqwest = { workspace = true }
reqwest = { workspace = true, features = ["multipart"] }
serde_json = { workspace = true }

View File

@@ -0,0 +1,218 @@
# LLMA Acceptance Test Suite
## Overview
This document describes the Python acceptance test suite for the LLM Analytics pipeline. These tests validate **end-to-end functionality** that requires real PostHog infrastructure: database, S3 storage, Kafka, authentication services, and the full ingestion pipeline.
**Implementation Requirement**: Each phase in the implementation plan must pass its corresponding acceptance tests before proceeding to the next phase. This ensures incremental validation and prevents regression as new features are added.
## Test Architecture
### Test Environment
- **Local PostHog Deployment**: Complete local PostHog setup including capture service with `/i/v0/ai` endpoint
- **S3-Compatible Storage**: Local S3-compatible storage for blob storage with direct access for verification
- **PostHog Query API**: Used to fetch processed events from the ingestion pipeline for validation
- **Direct S3 Access**: Test suite has direct S3 client access to verify blob storage and retrieval
- **Database**: PostgreSQL for team/token storage and event metadata
- **Kafka**: Event streaming for ingestion pipeline
- **ClickHouse**: Event storage and query API
### Test Framework
- **Acceptance Tests**: Full pipeline tests from HTTP request through ingestion to event storage
- **PostHog API Client**: Direct integration with PostHog query API to verify event processing
- **S3 Client**: Direct access to verify S3 blob storage, metadata, and retrieval
- **Parameterized Tests**: Test variations across different event types, blob sizes, and configurations
- **Async Testing**: Support for testing concurrent requests and large payload processing
### Test Data
- **AI Event Types**: `$ai_trace`, `$ai_span`, `$ai_generation`, `$ai_embedding`, `$ai_metric`, `$ai_feedback`
- **Blob Sizes**: Small (1KB), Medium (100KB), Large (1MB), Extra Large (10MB+)
- **Content Types**: `application/json`, `text/plain`, `application/octet-stream`
- **Compression**: Gzipped and uncompressed blobs
- **Encoding**: Raw binary, text, and base64-encoded blobs
- **Multipart Boundaries**: Various boundary strings to test collision handling
## Test Scenarios
**Phase Completion Requirement**: All acceptance tests for a phase must pass before implementation can proceed to the next phase. This gate ensures quality and prevents compound issues from multiple incomplete features.
### Phase 1: HTTP Endpoint
#### Scenario 1.1: Event Processing Verification
- **Test**: Send multipart request and verify event reaches PostHog query API
- **Validation**: Use PostHog query API to fetch processed event, verify blob placeholders correctly inserted
- **Tests Implemented**:
- `test_basic_ai_generation_event`: Full end-to-end event processing
- `test_all_accepted_ai_event_types`: Verify all six supported AI event types are accepted and stored
### Phase 2: Basic S3 Uploads
#### Scenario 2.1: Individual Blob Upload
- **Test**: Upload blobs of various sizes as separate S3 objects
- **Validation**: Verify each blob stored correctly, S3 URLs generated in event properties
- **Variations**: Small/medium/large blobs, different content types
#### Scenario 2.2: S3 URL Generation and Access
- **Test**: Verify generated S3 URLs in PostHog events point to accessible objects
- **Validation**: Query PostHog API for events, extract S3 URLs, verify blobs retrievable from S3
#### Scenario 2.3: Blob Metadata Storage
- **Test**: Verify S3 object metadata is stored correctly
- **Validation**: Use S3 client to inspect object metadata - Content-Type, size, team_id present
#### Scenario 2.4: Team Data Isolation
- **Test**: Multiple teams uploading simultaneously
- **Validation**: Verify S3 key prefixes are team-scoped, no cross-team data access, proper S3 path isolation
### Phase 4: Multipart File Processing
#### Scenario 4.1: Multipart File Creation
- **Test**: Upload events with multiple blobs, verify multipart/mixed format
- **Validation**: Use S3 client to verify single S3 file contains all blobs, proper MIME boundaries, metadata preserved
- **Variations**: 2-10 blobs per event, mixed content types, different blob sizes
#### Scenario 4.2: Byte Range URLs and Access
- **Test**: Verify S3 URLs in PostHog events include correct byte range parameters
- **Validation**: Query PostHog API for events, verify URLs contain range parameters, use S3 client to test range requests
#### Scenario 4.3: Content Type Handling
- **Test**: Mix of JSON, text, and binary blobs in single multipart file
- **Validation**: Content types preserved in multipart format, correctly parsed
### Phase 5: Authorization
#### Scenario 5.1: API Key Authentication
- **Test**: Send requests with valid/invalid/missing API keys
- **Validation**: Valid keys accepted, invalid keys rejected with 401, proper error messages
- **Tests Implemented**:
- `test_ai_endpoint_invalid_auth_returns_401`: Invalid token validation
### Phase 7: Compression
#### Scenario 7.1: Mixed Compression
- **Test**: Single request with both compressed and uncompressed blobs
- **Validation**: Each blob handled according to its compression state
### Phase 9: Limits (Optional) - DO NOT IMPLEMENT, TBD
### Phase 10: Data Deletion (Optional) - DO NOT IMPLEMENT, TBD
### Cross-Team Isolation Testing
## Error Recovery and Edge Cases
### Edge Case Scenarios
#### Scenario E.1: S3 Service Interruption
- **Test**: Simulate S3 unavailability during uploads
- **Validation**: Proper error responses, retry logic works, no data loss
#### Scenario E.2: Kafka Unavailability
- **Test**: Simulate Kafka unavailability during event publishing
- **Validation**: Appropriate error handling, request failure communicated to client
## Local Development Testing
### Test Implementation
The acceptance test suite is implemented in Python using pytest to test against full PostHog infrastructure.
#### Test Structure
- **Location**: `common/ingestion/acceptance_tests/test_llm_analytics.py`
- **Framework**: pytest with async support
- **Dependencies**:
- `requests` for HTTP client operations
- `boto3` for S3 client operations (when needed)
- PostHog SDK or API for event querying
- Django test utilities for setup
### Local Test Environment Setup
#### Prerequisites
- **Local PostHog Instance**: Full PostHog deployment running locally with all services (Django, capture, Kafka, ClickHouse, PostgreSQL)
- **Capture Service**: Running with `/i/v0/ai` endpoint enabled
- **Personal API Key**: PostHog personal API key for creating test organizations/projects
#### Environment Configuration
```bash
# PostHog Instance (defaults to http://localhost:8010 if not set)
export POSTHOG_TEST_BASE_URL="http://localhost:8010"
# Personal API Key (required - no default)
export POSTHOG_PERSONAL_API_KEY="your_personal_api_key_here"
```
**Creating a Personal API Key:**
1. Navigate to your PostHog instance (e.g., `http://localhost:8010`)
2. Go to **Settings** (sidebar) → **Account****Personal API Keys**
3. Click **Create personal API key**
4. Configure the key:
- **Organization & project access**: Set to **All** (to avoid permission issues)
- **Scopes**: Set to **All access** (required for creating/deleting test organizations and projects)
5. Copy the generated key and set it as `POSTHOG_PERSONAL_API_KEY`
**Note**: The test suite automatically creates temporary organizations and projects for each test class and cleans them up after tests complete. S3 configuration is handled by the PostHog instance itself.
### Test Execution
#### Running Tests
```bash
# Run all acceptance tests using the test runner
cd common/ingestion/acceptance_tests
python run_tests.py
# Or run pytest directly for specific tests
pytest test_llm_analytics.py::TestLLMAnalytics::test_basic_ai_generation_event -v
# Run specific test class
pytest test_llm_analytics.py::TestLLMAnalytics -v
```
The `run_tests.py` script automatically:
- Configures pytest with appropriate logging and verbosity settings
- Runs tests in parallel using `--numprocesses=auto` for faster execution
- Shows detailed debug logs during test execution
#### Test Utilities
Each test phase will include common utilities for:
- **Multipart Request Builder**: Construct multipart/form-data requests with event JSON and blob parts
- **S3 Client Wrapper**: Direct S3 operations for validation and cleanup
- **PostHog API Client**: Query PostHog API to verify event processing
- **Test Data Generators**: Create various blob sizes, content types, and event payloads
- **Cleanup Helpers**: Remove test data from S3 and PostHog between test runs
#### Test Data Management
- **Isolated Test Teams**: Each test uses unique team IDs to prevent interference
- **Cleanup Between Tests**: Automatic cleanup of S3 objects and PostHog test data
- **Fixture Data**: Predefined multipart requests and blob data for consistent testing
- **Random Data Generation**: Configurable blob sizes and content for stress testing
## Phase Gating
- **Mandatory Testing**: All acceptance tests for a phase must pass before proceeding to implementation of the next phase
- **Regression Prevention**: Previous phase tests continue to run to ensure no regression
- **Incremental Validation**: Each phase builds upon validated functionality from previous phases

View File

@@ -10,13 +10,13 @@ This document outlines the implementation steps for the LLM Analytics capture pi
#### 0.1 Routing Configuration
- [ ] Create new `/ai` endpoint in capture service
- [ ] Set up routing for `/ai` endpoint to capture service
- [x] Create new `/i/v0/ai` endpoint in capture service
- [ ] Set up routing for `/i/v0/ai` endpoint to capture service
#### 0.2 End-to-End Integration Tests
- [ ] Implement end-to-end integration tests for the full LLM analytics pipeline
- [ ] Create test scenarios with multipart requests and blob data
- [x] Implement Rust integration tests for multipart parsing and validation
- [x] Create Python acceptance test scenarios with multipart requests and blob data
- [ ] Test Kafka message output and S3 storage integration
- [ ] Set up automated test suite for continuous validation
@@ -24,20 +24,25 @@ This document outlines the implementation steps for the LLM Analytics capture pi
#### 1.1 HTTP Endpoint Foundation
- [ ] Implement multipart/form-data request parsing
- [ ] Add server-side boundary validation
- [x] Implement multipart/form-data request parsing
- [x] Add server-side boundary validation
- [x] Support separate `event.properties` multipart part
- [x] Implement gzip decompression for compressed requests
- [ ] Output events with blob placeholders to Kafka
- [ ] Implement error schema
- [x] Implement error schema
#### 1.2 Basic Validation
- [ ] Implement `$ai_` event name prefix validation
- [ ] Validate blob part names against event properties
- [ ] Prevent blob overwriting of existing properties
- [x] Implement specific AI event type validation ($ai_generation, $ai_trace, $ai_span, $ai_embedding, $ai_metric, $ai_feedback)
- [x] Validate blob part names against event properties
- [x] Prevent blob overwriting of existing properties (reject if both embedded and separate properties)
- [x] Validate event part is first in multipart request
- [x] Validate required fields (event name, distinct_id, $ai_model)
- [x] Implement size limits (32KB event, 960KB combined, 25MB total, 27.5MB request body)
#### 1.3 Initial Deployment
- [ ] Deploy capture-ai service to production with basic `/ai` endpoint
- [ ] Deploy capture-ai service to production with basic `/i/v0/ai` endpoint
- [ ] Test basic multipart parsing and Kafka output functionality
- [ ] Verify endpoint responds correctly to AI events
@@ -81,6 +86,7 @@ This document outlines the implementation steps for the LLM Analytics capture pi
#### 5.1 Request Signature Verification
- [x] Implement basic API key validation (Bearer token authentication)
- [ ] Implement PostHog API key authentication
- [ ] Add request signature verification
- [ ] Validate API key before processing multipart data
@@ -96,8 +102,8 @@ This document outlines the implementation steps for the LLM Analytics capture pi
#### 6.2 Alerting
- [ ] Configure alerts for S3 upload failures
- [ ] Set up alerts for high error rates on `/ai` endpoint
- [ ] Set up alerts for high latency on `/ai` endpoint
- [ ] Set up alerts for high error rates on `/i/v0/ai` endpoint
- [ ] Set up alerts for high latency on `/i/v0/ai` endpoint
#### 6.3 Runbooks
@@ -107,28 +113,37 @@ This document outlines the implementation steps for the LLM Analytics capture pi
#### 7.1 Compression Support
- [ ] Parse Content-Encoding headers from SDK requests
- [ ] Implement server-side compression for uncompressed text/JSON
- [ ] Add compression metadata to multipart files
- [ ] Handle mixed compressed/uncompressed blobs
- [x] Parse Content-Encoding: gzip header for request-level compression
- [x] Implement streaming gzip decompression for compressed requests
- [x] Test with gzip-compressed multipart requests
- [ ] Implement server-side compression for uncompressed blobs before S3 storage
- [ ] Add compression metadata to S3 objects
- [ ] Track compression ratio effectiveness
### Phase 8: Schema Validation
#### 8.1 Schema Validation
- [ ] Create strict schema definitions for each AI event type
- [ ] Add schema validation for event payloads
- [ ] Validate Content-Type headers on blob parts
- [ ] Add Content-Length validation
- [x] Validate Content-Type headers on blob parts (required: application/json, text/plain, application/octet-stream)
- [x] Validate event JSON structure (event, distinct_id, properties fields)
- [x] Validate required AI properties ($ai_model)
- [x] Test with different supported content types
- [ ] Create comprehensive schema definitions for each AI event type
- [ ] Add detailed schema validation for event-specific properties
- [ ] Add Content-Length validation beyond size limits
### Phase 9: Limits (Optional)
#### 9.1 Request Validation & Limits
- [ ] Add request size limits and validation
- [x] Add request size limits and validation (configurable via `ai_max_sum_of_parts_bytes`)
- [x] Implement event part size limit (32KB)
- [x] Implement combined event+properties size limit (960KB)
- [x] Implement total parts size limit (25MB default, configurable)
- [x] Implement request body size limit (110% of total parts limit)
- [x] Return 413 Payload Too Large for size violations
- [ ] Add request rate limiting per team
- [ ] Implement payload size limits per team
- [ ] Implement per-team payload size limits
### Phase 10: Data Deletion (Optional)

View File

@@ -13,43 +13,101 @@ This approach allows us to capture comprehensive LLM usage data without impactin
## Supported Events
### Events with Large Context Payloads
The LLM Analytics capture endpoint supports four primary AI event types. Events are sent to the `/i/v0/ai` endpoint with multipart payloads to handle large context data efficiently.
These events contain substantial LLM context that requires blob storage:
### `$ai_generation`
- **`$ai_trace`**
- `$ai_input_state`
- `$ai_output_state`
A generation represents a single call to an LLM (e.g., a chat completion request).
- **`$ai_span`**
- `$ai_input_state`
- `$ai_output_state`
**Core Properties:**
- **`$ai_generation`**
- `$ai_input` - Can contain 300,000+ LLM tokens, making blob storage essential
- `$ai_output_choices`
- `$ai_trace_id` (required) - UUID to group AI events (e.g., conversation_id)
- `$ai_model` (required) - The model used (e.g., "gpt-4o", "claude-3-opus")
- `$ai_provider` (required) - The LLM provider (e.g., "openai", "anthropic", "gemini")
- `$ai_input` - List of messages sent to the LLM (can be stored as blob)
- Can contain 300,000+ tokens, making blob storage essential
- Each message has a `role` ("user", "system", or "assistant") and `content` array
- Content types: text, image URLs, function calls
- `$ai_output_choices` - List of response choices from the LLM (can be stored as blob)
- Each choice has a `role` and `content` array
- `$ai_input_tokens` - Number of tokens in the input
- `$ai_output_tokens` - Number of tokens in the output
- `$ai_span_id` (optional) - Unique identifier for this generation
- `$ai_span_name` (optional) - Name given to this generation
- `$ai_parent_id` (optional) - Parent span ID for tree view grouping
- `$ai_latency` (optional) - LLM call latency in seconds
- `$ai_http_status` (optional) - HTTP status code of the response
- `$ai_base_url` (optional) - Base URL of the LLM provider
- `$ai_request_url` (optional) - Full URL of the request
- `$ai_is_error` (optional) - Boolean indicating if the request was an error
- `$ai_error` (optional) - Error message or object
- **`$ai_embedding`**
- `$ai_input`
- Note: Output data is not currently included in the event payload, though this may be added in future iterations
**Cost Properties** (optional, auto-calculated from model and token counts if not provided):
### Standard Events
- `$ai_input_cost_usd` - Cost in USD of input tokens
- `$ai_output_cost_usd` - Cost in USD of output tokens
- `$ai_total_cost_usd` - Total cost in USD
These events can be processed through the regular pipeline without blob storage:
**Cache Properties** (optional):
- **`$ai_metric`** - Lightweight metric data that doesn't require offloading
- **`$ai_feedback`** - User feedback events that remain small enough for standard processing
- `$ai_cache_read_input_tokens` - Number of tokens read from cache
- `$ai_cache_creation_input_tokens` - Number of tokens written to cache (Anthropic-specific)
### Future Considerations
**Model Parameters** (optional):
The event schema is designed to accommodate future multimodal content types, including:
- `$ai_temperature` - Temperature parameter used
- `$ai_stream` - Whether the response was streamed
- `$ai_max_tokens` - Maximum tokens setting
- `$ai_tools` - Tools/functions available to the LLM
- Images
- Audio
- Video
- Files
### `$ai_trace`
These additions will leverage the same blob storage infrastructure when implemented.
A trace represents a complete AI interaction flow (e.g., a full conversation or agent execution).
**Key Properties:**
- `$ai_trace_id` (required) - UUID identifying this trace
- `$ai_input_state` - Initial state of the trace (can be stored as blob)
- `$ai_output_state` - Final state of the trace (can be stored as blob)
### `$ai_span`
A span represents a logical unit of work within a trace (e.g., a tool call, a retrieval step).
**Key Properties:**
- `$ai_trace_id` (required) - Parent trace UUID
- `$ai_span_id` (required) - Unique identifier for this span
- `$ai_parent_id` (optional) - Parent span ID for nesting
- `$ai_span_name` - Name describing this span
- `$ai_input_state` - Input state for this span (can be stored as blob)
- `$ai_output_state` - Output state for this span (can be stored as blob)
### `$ai_embedding`
An embedding event captures vector generation for semantic search or RAG systems.
**Key Properties:**
- `$ai_trace_id` (required) - Parent trace UUID
- `$ai_model` (required) - Embedding model used
- `$ai_provider` (required) - Provider (e.g., "openai", "cohere")
- `$ai_input` - Text or data being embedded (can be stored as blob)
- `$ai_input_tokens` - Number of tokens in the input
- Note: Output vectors are typically not captured in events
### Standard Events (No Blob Storage Required)
These events are lightweight and processed through the regular pipeline:
- **`$ai_metric`** - Performance metrics, usage statistics
- **`$ai_feedback`** - User feedback on AI responses
### Blob Storage Strategy
Properties that can contain large payloads (marked as "can be stored as blob" above) should be sent as separate multipart parts with names like `event.properties.$ai_input` or `event.properties.$ai_output_choices`. This keeps the event JSON small while allowing arbitrarily large context data to be stored efficiently in S3.
**Reference:** [PostHog LLM Analytics Manual Capture Documentation](https://posthog.com/docs/llm-analytics/manual-capture)
## General Architecture
@@ -58,7 +116,7 @@ The LLM Analytics capture system implements a specialized data flow that efficie
### Data Flow
1. **Event Ingestion**
- Events are transmitted using server-side PostHog SDKs via HTTP to a dedicated `/ai` endpoint
- Events are transmitted using server-side PostHog SDKs via HTTP to a dedicated `/i/v0/ai` endpoint
- Requests utilize multipart payloads containing:
- Event payload (metadata and standard properties)
- Binary blobs containing LLM context (e.g., input state, output state property values)
@@ -85,7 +143,7 @@ The LLM Analytics capture system implements a specialized data flow that efficie
### HTTP Endpoint
The `/ai` endpoint accepts multipart POST requests with the following structure:
The `/i/v0/ai` endpoint accepts multipart POST requests with the following structure:
#### Request Format
@@ -97,22 +155,35 @@ The `/ai` endpoint accepts multipart POST requests with the following structure:
**Multipart Parts:**
1. **Event Part** (required)
- `Content-Disposition: form-data; name="event"`
- `Content-Type: application/json`
- Body: Standard PostHog event JSON payload
- `Content-Disposition: form-data; name="event"` (required)
- `Content-Type: application/json` (required)
- Body: Standard PostHog event JSON payload (without properties, or with properties that will be rejected if `event.properties` part is also present)
2. **Blob Parts** (optional, multiple allowed)
- `Content-Disposition: form-data; name="event.properties.<property_name>"; filename="<blob_id>"`
- `Content-Type: application/octet-stream` (or `application/json`, `text/plain`, etc.)
- `Content-Encoding: gzip` (optional, for compressed data)
- `Content-Length: <size>` (size of the blob part in bytes)
- Body: Binary blob data (optionally gzip compressed)
2. **Event Properties Part** (optional)
- `Content-Disposition: form-data; name="event.properties"` (required)
- `Content-Type: application/json` (required)
- Body: JSON object containing event properties
- Cannot be used together with embedded properties in the event part (request will be rejected with 400 Bad Request)
- Properties from this part are merged into the event as the `properties` field
3. **Blob Parts** (optional, multiple allowed)
- `Content-Disposition: form-data; name="event.properties.<property_name>"; filename="<blob_id>"` (required)
- `Content-Type: application/octet-stream | application/json | text/plain` (required)
- Body: Binary blob data
- The part name follows the JSON path in the event object (e.g., `event.properties.$ai_input_state`)
**Allowed Part Headers:**
- `Content-Disposition` (required for all parts)
- `Content-Type` (required for all parts)
- No other headers are supported on individual parts (e.g., `Content-Encoding` is not allowed on parts)
**Note:** Individual parts cannot have their own compression. To compress the entire request payload, use the `Content-Encoding: gzip` header at the HTTP request level.
#### Example Request Structure
```http
POST /ai HTTP/1.1
POST /i/v0/ai HTTP/1.1
Content-Type: multipart/form-data; boundary=----boundary123
------boundary123
@@ -121,32 +192,34 @@ Content-Type: application/json
{
"event": "$ai_generation",
"properties": {
"model": "gpt-4",
"completion_tokens": 150
},
"distinct_id": "user_123",
"timestamp": "2024-01-15T10:30:00Z"
}
------boundary123
Content-Disposition: form-data; name="event.properties"
Content-Type: application/json
{
"$ai_model": "gpt-4",
"completion_tokens": 150
}
------boundary123
Content-Disposition: form-data; name="event.properties.$ai_input"; filename="blob_abc123"
Content-Type: application/json
Content-Encoding: gzip
Content-Length: 2048
[Gzipped JSON LLM input data]
[JSON LLM input data]
------boundary123
Content-Disposition: form-data; name="event.properties.$ai_output_choices"; filename="blob_def456"
Content-Type: application/json
Content-Length: 5120
[Uncompressed JSON LLM output data]
[JSON LLM output data]
------boundary123
Content-Disposition: form-data; name="event.properties.$ai_embedding_vector"; filename="blob_ghi789"
Content-Type: application/octet-stream
Content-Length: 16384
[Binary embedding vector data]
------boundary123--
@@ -162,18 +235,40 @@ To prevent LLM data from accidentally containing the multipart boundary sequence
#### Processing Flow
1. Parse multipart request
2. Extract event JSON from the "event" part
3. Collect all blob parts:
- Extract property path from each part name
- Verify the property doesn't already exist in the event JSON
1. **Parse multipart request**
- Validate that the first part is the `event` part
- Extract event JSON from the "event" part
2. **Handle event properties**
- If `event.properties` part exists: extract properties JSON from it
- If embedded properties exist in the event part AND `event.properties` part exists: reject with 400 Bad Request
- Merge properties into the event (from `event.properties` part if present, otherwise use embedded properties)
3. **Validate event structure**
- Check event name starts with `$ai_`
- Verify required fields (distinct_id, properties, etc.)
- Validate required AI properties (e.g., `$ai_model`)
4. **Collect all blob parts**
- Extract property path from each part name (e.g., `event.properties.$ai_input`)
- Store blob data with metadata (property path, content type, size)
4. Create multipart file containing all blobs with index
5. Upload single multipart file to S3:
- Check for duplicate blob property names
5. **Validate size limits**
- Event part ≤ 32KB
- Event + properties combined ≤ 960KB
- Sum of all parts ≤ 25MB (configurable)
6. **Create multipart file containing all blobs with index**
7. **Upload single multipart file to S3**
- Generate S3 key using team_id, event_id, and random string
- Include blob index in S3 object metadata
6. Add properties to event with S3 URLs including byte ranges
7. Send modified event to Kafka
8. **Replace blob properties with S3 URLs**
- Add properties to event with S3 URLs including byte ranges
9. **Send modified event to Kafka**
### S3 Storage
@@ -269,52 +364,58 @@ s3://posthog-llm-analytics/llma/1y/789/2024-01-15/event_678_t1u3v.multipart
### Content Types
#### Supported Content Types
#### Supported Content Types for Blob Parts
The following content types are accepted for blob parts:
- `application/octet-stream` - Default for binary data
- `application/json` - JSON formatted LLM context
- `text/plain` - Plain text LLM inputs/outputs
- `application/octet-stream` - For binary data
- `application/json` - For JSON formatted LLM context
- `text/plain` - For plain text LLM inputs/outputs
The event and event.properties parts must use `application/json`.
#### Content Type Handling
- Blob parts must include a Content-Type header
- All parts must include a Content-Type header
- Blob parts with unsupported content types are rejected with 400 Bad Request
- Blob parts missing Content-Type header are rejected with 400 Bad Request
- The Content-Type is stored within the multipart file for each part
- Content-Type is used by the evaluation service to determine how to parse each blob within the multipart file
### Compression
#### Client-side Compression
#### Request-Level Compression
SDKs should compress blob payloads before transmission to reduce bandwidth usage:
The endpoint supports request-level gzip compression to reduce bandwidth usage:
- Compression algorithm: gzip
- Compressed parts should include `Content-Encoding: gzip` header
- Original Content-Type should be preserved (e.g., `Content-Type: application/json` with `Content-Encoding: gzip`)
- **Compression algorithm**: gzip
- **How to compress**: Add `Content-Encoding: gzip` header to the HTTP request and compress the entire multipart request body
- **Server behavior**: The capture service will detect the `Content-Encoding: gzip` header and decompress the entire request before processing the multipart data
- **Recommendation**: SDKs should compress large requests (e.g., > 10KB) to minimize network transfer time
#### Server-side Compression
**Example Compressed Request:**
For uncompressed data received from SDKs:
```http
POST /i/v0/ai HTTP/1.1
Content-Type: multipart/form-data; boundary=----boundary123
Content-Encoding: gzip
- The capture service will automatically compress the following content types:
[Gzipped multipart request body]
```
The entire multipart body (including all parts) is compressed as a single gzip stream.
#### Server-side Compression (S3 Storage)
For data received from SDKs (after request decompression, if any):
- The capture service will automatically compress the following content types before storing in S3:
- `application/json`
- `text/*` (all text subtypes)
- Binary formats (`application/octet-stream`) will not be automatically compressed
- Compression is applied before storing in S3
- S3 object metadata will indicate if server-side compression was applied
#### Example Headers
Compressed blob part from SDK:
```http
Content-Disposition: form-data; name="event.properties.$ai_input"; filename="blob_abc123"
Content-Type: application/json
Content-Encoding: gzip
[Gzipped JSON data]
```
- This compression is transparent to clients and reduces storage costs
## Reliability Concerns
@@ -330,7 +431,7 @@ Content-Encoding: gzip
### Preventing Malicious Uploads
- All requests to the `/ai` endpoint must be authenticated using the project's private API key
- All requests to the `/i/v0/ai` endpoint must be authenticated using the project's private API key
- The capture service validates the API key before processing any multipart data
- This prevents unauthorized uploads and ensures blob storage is only used by legitimate PostHog projects
@@ -340,7 +441,7 @@ The authentication process for LLM analytics events follows these steps:
1. **API Key Extraction**
- Extract the API key from the request headers (e.g., `Authorization: Bearer <api_key>`)
- API key must be present for all requests to `/ai`
- API key must be present for all requests to `/i/v0/ai`
2. **Early Validation**
- Validate the API key format and existence before parsing multipart data
@@ -402,7 +503,7 @@ Three approaches for handling data deletion requests:
The capture service enforces strict validation on incoming events:
1. **Event Name Validation**
- All events sent to `/ai` must have an event name starting with `$ai_`
- All events sent to `/i/v0/ai` must have an event name starting with `$ai_`
- Requests with non-AI events are rejected with 400 Bad Request
- This ensures the endpoint is only used for its intended purpose
@@ -420,10 +521,14 @@ The capture service enforces strict validation on incoming events:
- Nested property paths are supported (e.g., `event.properties.nested.$ai_input`)
5. **Size Limits**
- Maximum total payload size enforced for security (e.g., 100MB default)
- Limits can be configured per team to accommodate different use cases
- Event JSON payload has a separate maximum size limit
- Individual blob parts have maximum size limits
All size limit violations return 413 Payload Too Large:
- **Request body**: Maximum 27.5MB (110% of sum of all parts limit, enforced by Axum)
- Computed as 110% of `AI_MAX_SUM_OF_PARTS_BYTES` to account for multipart overhead
- This is the first check, applied before any request processing
- **Event part**: Maximum 32KB (enforced by handler)
- **Event + properties combined**: Maximum 960KB (1MB - 64KB, enforced by handler)
- **Sum of all parts** (event, properties, and all blobs): Maximum 25MB (default, configurable via `AI_MAX_SUM_OF_PARTS_BYTES`, enforced by handler)
- These limits are configurable via environment variables and can be adjusted per deployment
6. **Strict Schema Validation**
- Each `$ai_` event type has a strictly defined schema
@@ -432,10 +537,6 @@ The capture service enforces strict validation on incoming events:
- Blob properties must match expected blob fields for each event type
- Non-conforming events are rejected with detailed validation errors
## Open Questions
- Should the capture service validate Content-Types of blob parts against a whitelist, or accept any Content-Type provided by the client?
## Rejected Solutions
### WarpStream-based Processing

View File

@@ -2,36 +2,33 @@
## Overview
This document describes the high-level architecture and test scenarios for the LLM Analytics capture pipeline integration test suite. The tests validate the complete end-to-end flow from multipart request ingestion through S3 storage to event processing.
This document describes the Rust integration test suite for the LLM Analytics capture service. These tests validate HTTP endpoint behavior, multipart parsing, and validation logic without requiring external dependencies.
**Implementation Requirement**: Each phase in the implementation plan must pass its corresponding integration tests before proceeding to the next phase. This ensures incremental validation and prevents regression as new features are added.
**See Also**: `llma-acceptance-test-suite.md` for end-to-end tests that require full PostHog infrastructure.
## Test Architecture
### Test Environment
- **Local PostHog Deployment**: Complete local PostHog setup including capture service with `/ai` endpoint
- **S3-Compatible Storage**: Local S3-compatible storage for blob storage with direct access for verification
- **PostHog Query API**: Used to fetch processed events from the ingestion pipeline for validation
- **Direct S3 Access**: Test suite has direct S3 client access to verify blob storage and retrieval
- **Test Fixtures**: Predefined multipart requests with various blob sizes and types
- **In-Memory Router**: Axum router running in test process using `axum-test-helper::TestClient`
- **Multipart Construction**: Using `reqwest::multipart::Form` to build proper multipart bodies
- **Mock Dependencies**: Mock Redis, time source, and event sink for isolation
- **No External Services**: No database, S3, Kafka, or real authentication required
### Test Framework
- **Integration Tests**: Full pipeline tests from HTTP request through ingestion to event storage
- **PostHog API Client**: Direct integration with PostHog query API to verify event processing
- **S3 Client**: Direct access to verify S3 blob storage, metadata, and retrieval
- **Integration Tests**: Capture service tests from HTTP request through parsing and validation
- **Parameterized Tests**: Test variations across different event types, blob sizes, and configurations
- **Async Testing**: Support for testing concurrent requests and large payload processing
### Test Data
- **AI Event Types**: `$ai_trace`, `$ai_span`, `$ai_generation`, `$ai_embedding`, `$ai_metric`, `$ai_feedback`
- **Blob Sizes**: Small (1KB), Medium (100KB), Large (1MB), Extra Large (10MB+)
- **AI Event Types**: Primarily `$ai_generation` for validation testing
- **Blob Sizes**: Small (< 1KB), Medium (~100KB), Empty
- **Content Types**: `application/json`, `text/plain`, `application/octet-stream`
- **Compression**: Gzipped and uncompressed blobs
- **Encoding**: Raw binary, text, and base64-encoded blobs
- **Multipart Boundaries**: Various boundary strings to test collision handling
- **Multipart Boundaries**: Various boundary strings including custom boundaries
## Test Scenarios
@@ -41,91 +38,64 @@ This document describes the high-level architecture and test scenarios for the L
#### Scenario 1.1: Basic Routing
- **Test**: Verify `/ai` endpoint is accessible and returns correct response codes
- **Test**: Verify `/i/v0/ai` endpoint is accessible and returns correct response codes
- **Validation**: HTTP 200 for valid requests, proper error codes for invalid requests
- **Tests Implemented**:
- `test_ai_endpoint_get_returns_405`: GET requests return 405
- `test_ai_endpoint_put_returns_405`: PUT requests return 405
- `test_ai_endpoint_delete_returns_405`: DELETE requests return 405
- `test_ai_endpoint_no_auth_returns_401`: No auth header returns 401
#### Scenario 1.2: Multipart Parsing
- **Test**: Send multipart requests with various boundary strings and blob configurations
- **Validation**: All parts parsed correctly, blob data extracted without corruption§
- **Validation**: All parts parsed correctly, blob data extracted without corruption
- **Variations**: Different boundary formats, multiple blobs, mixed content types
- **Tests Implemented**:
- `test_multipart_parsing_with_multiple_blobs`: Parse 4 parts (event + 3 blobs)
- `test_multipart_parsing_with_mixed_content_types`: JSON, text, binary
- `test_multipart_parsing_with_large_blob`: Large blob (~100KB)
- `test_multipart_parsing_with_empty_blob`: Empty blob handling
#### Scenario 1.3: Boundary Validation
- **Test**: Send requests with malformed boundaries, missing boundaries, boundary collisions
- **Validation**: Appropriate error responses, no server crashes, proper error logging
- **Tests Implemented**:
- `test_multipart_missing_boundary_returns_400`: Missing boundary parameter
- `test_multipart_corrupted_boundary_returns_400`: Mismatched boundary
#### Scenario 1.4: Event Processing Verification
#### Scenario 1.4: Basic Validation
- **Test**: Send multipart request and verify event reaches PostHog query API
- **Validation**: Use PostHog query API to fetch processed event, verify blob placeholders correctly inserted
- **Test**: Send events with valid/invalid event types, missing required properties, duplicate blob properties
- **Validation**: Only accepted AI event types (`$ai_generation`, `$ai_trace`, `$ai_span`, `$ai_embedding`, `$ai_metric`, `$ai_feedback`) are processed; invalid events rejected with proper error messages
- **Tests Implemented**:
- `test_all_allowed_ai_event_types_accepted`: All six accepted event types pass validation
- `test_invalid_ai_event_type_returns_400`: Invalid AI event types rejected (e.g., `$ai_unknown`, `$ai_custom`)
- `test_invalid_event_name_not_ai_prefix_returns_400`: Non-AI event names rejected
- `test_invalid_event_name_regular_event_returns_400`: Regular events rejected (e.g., `$pageview`)
- `test_invalid_event_name_custom_event_returns_400`: Custom events rejected
- `test_missing_required_ai_properties_returns_400`: Missing `$ai_model`
- `test_empty_event_name_returns_400`: Empty event names
- `test_missing_distinct_id_returns_400`: Missing distinct_id
- `test_multipart_event_not_first_returns_400`: Event part ordering
#### Scenario 1.5: Basic Validation
#### Scenario 1.5: Content Type Validation
- **Test**: Send events with invalid names (not starting with `$ai_`), duplicate blob properties
- **Validation**: Invalid events rejected, valid events processed, proper error messages
### Phase 2: Basic S3 Uploads
#### Scenario 2.1: Individual Blob Upload
- **Test**: Upload blobs of various sizes as separate S3 objects
- **Validation**: Verify each blob stored correctly, S3 URLs generated in event properties
- **Variations**: Small/medium/large blobs, different content types
#### Scenario 2.2: S3 URL Generation and Access
- **Test**: Verify generated S3 URLs in PostHog events point to accessible objects
- **Validation**: Query PostHog API for events, extract S3 URLs, verify blobs retrievable from S3
#### Scenario 2.3: Blob Metadata Storage
- **Test**: Verify S3 object metadata is stored correctly
- **Validation**: Use S3 client to inspect object metadata - Content-Type, size, team_id present
#### Scenario 2.4: Team Data Isolation
- **Test**: Multiple teams uploading simultaneously
- **Validation**: Verify S3 key prefixes are team-scoped, no cross-team data access, proper S3 path isolation
### Phase 3: S3 Infrastructure & Deployment
#### Scenario 3.1: S3 Bucket Configuration
- **Test**: Verify S3 bucket structure and lifecycle policies
- **Validation**: Use S3 client to verify correct `llma/` prefix structure, retention policies configured
### Phase 4: Multipart File Processing
#### Scenario 4.1: Multipart File Creation
- **Test**: Upload events with multiple blobs, verify multipart/mixed format
- **Validation**: Use S3 client to verify single S3 file contains all blobs, proper MIME boundaries, metadata preserved
- **Variations**: 2-10 blobs per event, mixed content types, different blob sizes
#### Scenario 4.2: Byte Range URLs and Access
- **Test**: Verify S3 URLs in PostHog events include correct byte range parameters
- **Validation**: Query PostHog API for events, verify URLs contain range parameters, use S3 client to test range requests
#### Scenario 4.3: Content Type Handling
- **Test**: Mix of JSON, text, and binary blobs in single multipart file
- **Validation**: Content types preserved in multipart format, correctly parsed
- **Test**: Send requests with wrong content type or empty body
- **Validation**: Only `multipart/form-data` accepted
- **Tests Implemented**:
- `test_ai_endpoint_wrong_content_type_returns_400`: Non-multipart type
- `test_ai_endpoint_empty_body_returns_400`: Empty body
### Phase 5: Authorization
#### Scenario 5.1: API Key Authentication
- **Test**: Send requests with valid/invalid/missing API keys
- **Validation**: Valid keys accepted, invalid keys rejected with 401, proper error messages
#### Scenario 5.2: Request Signature Verification
#### Scenario 5.1: Request Signature Verification
- **Test**: Test signature validation for various request formats
- **Validation**: Valid signatures accepted, invalid signatures rejected
#### Scenario 5.3: Pre-processing Authentication
#### Scenario 5.2: Pre-processing Authentication
- **Test**: Verify authentication occurs before multipart parsing
- **Validation**: Invalid auth rejected immediately, no resource consumption for unauthorized requests
@@ -142,11 +112,6 @@ This document describes the high-level architecture and test scenarios for the L
- **Test**: Send uncompressed JSON/text blobs
- **Validation**: Server compresses before S3 storage, compression metadata preserved
#### Scenario 7.3: Mixed Compression
- **Test**: Single request with both compressed and uncompressed blobs
- **Validation**: Each blob handled according to its compression state
### Phase 8: Schema Validation
#### Scenario 8.1: Event Schema Validation
@@ -165,12 +130,6 @@ This document describes the high-level architecture and test scenarios for the L
- **Test**: Mismatched Content-Length headers and actual blob sizes
- **Validation**: Mismatches detected and handled appropriately
### Phase 9: Limits (Optional) - DO NOT IMPLEMENT, TBD
### Phase 10: Data Deletion (Optional) - DO NOT IMPLEMENT, TBD
### Cross-Team Isolation Testing
## Error Recovery and Edge Cases
### Edge Case Scenarios
@@ -180,213 +139,42 @@ This document describes the high-level architecture and test scenarios for the L
- **Test**: Invalid JSON, corrupted multipart data, missing required headers
- **Validation**: Graceful error handling, no server crashes, proper error responses
#### Scenario E.2: S3 Service Interruption
- **Test**: Simulate S3 unavailability during uploads
- **Validation**: Proper error responses, retry logic works, no data loss
#### Scenario E.3: Kafka Unavailability
- **Test**: Simulate Kafka unavailability during event publishing
- **Validation**: Appropriate error handling, request failure communicated to client
## Local Development Testing
### Test Implementation
The integration test suite will be implemented in Rust to align with the capture service's existing toolchain and avoid introducing additional dependencies.
**Location**: `rust/capture/tests/integration_ai_endpoint.rs`
#### Test Structure
**Framework**: Tokio async tests with `axum-test-helper`
- **Location**: `tests/integration/llma/` directory within the capture service codebase
- **Framework**: Standard Rust testing framework with `tokio-test` for async operations
- **Dependencies**:
- `reqwest` for HTTP client operations
- `aws-sdk-s3` for S3 client operations
- `serde_json` for JSON manipulation
- `multipart` for constructing test requests
**Dependencies**:
#### Test Organization
- `reqwest` with `multipart` feature for constructing test requests
- `axum-test-helper` for in-memory HTTP testing
- `serde_json` for JSON manipulation
```text
tests/
└── integration/
└── llma/
├── mod.rs # Common test utilities and setup
├── phase_01_http.rs # Phase 1: HTTP Endpoint tests
├── phase_02_s3.rs # Phase 2: Basic S3 Upload tests
├── phase_03_infra.rs # Phase 3: S3 Infrastructure tests
├── phase_04_multipart.rs # Phase 4: Multipart File tests
├── phase_05_auth.rs # Phase 5: Authorization tests
├── phase_07_compression.rs # Phase 7: Compression tests
├── phase_08_validation.rs # Phase 8: Schema Validation tests
└── fixtures/ # Test data and multipart request templates
```
### Local Test Environment Setup
#### Prerequisites
- **Local PostHog Instance**: Full PostHog deployment running locally
- **Local S3 Storage**: S3-compatible storage (configured via PostHog local setup)
- **Capture Service**: Running with `/ai` endpoint enabled
- **Test Configuration**: Environment variables for service endpoints and credentials
#### Environment Configuration
### Running Tests
```bash
# PostHog Local Instance
export POSTHOG_HOST="http://localhost:8000"
export POSTHOG_API_KEY="test_api_key_123"
export POSTHOG_PROJECT_ID="1"
# Run all AI endpoint integration tests
cargo test --test integration_ai_endpoint
# Local S3 Configuration
export AWS_ENDPOINT_URL="http://localhost:9000" # Local S3-compatible endpoint
export AWS_ACCESS_KEY_ID="minioadmin"
export AWS_SECRET_ACCESS_KEY="minioadmin"
export AWS_DEFAULT_REGION="us-east-1"
export LLMA_S3_BUCKET="posthog-llma-test"
# Capture Service
export CAPTURE_ENDPOINT="http://localhost:3000"
export LLMA_TEST_MODE="local"
```
### Test Execution
#### Running Tests
```bash
# Run all LLMA integration tests
cargo test --test llma_integration
# Run specific phase tests
cargo test --test llma_integration phase_01
cargo test --test llma_integration phase_02
# Run specific test
cargo test --test integration_ai_endpoint test_multipart_parsing_with_multiple_blobs
# Run with detailed output
cargo test --test llma_integration -- --nocapture
# Run tests in sequence (important for phase gating)
cargo test --test llma_integration -- --test-threads=1
cargo test --test integration_ai_endpoint -- --nocapture
```
#### Test Utilities
### Test Characteristics
Each test phase will include common utilities for:
- **Multipart Request Builder**: Construct multipart/form-data requests with event JSON and blob parts
- **S3 Client Wrapper**: Direct S3 operations for validation and cleanup
- **PostHog API Client**: Query PostHog API to verify event processing
- **Test Data Generators**: Create various blob sizes, content types, and event payloads
- **Cleanup Helpers**: Remove test data from S3 and PostHog between test runs
#### Test Data Management
- **Isolated Test Teams**: Each test uses unique team IDs to prevent interference
- **Cleanup Between Tests**: Automatic cleanup of S3 objects and PostHog test data
- **Fixture Data**: Predefined multipart requests and blob data for consistent testing
- **Random Data Generation**: Configurable blob sizes and content for stress testing
- Run in-memory with no external dependencies
- Mock Redis, time source, and event sink
- Fast execution suitable for CI/CD
- Test HTTP request/response handling only
## Phase Gating
- **Mandatory Testing**: All integration tests for a phase must pass before proceeding to implementation of the next phase
- **Regression Prevention**: Previous phase tests continue to run to ensure no regression
- **Incremental Validation**: Each phase builds upon validated functionality from previous phases
## Production Testing
### Overview
For validating the LLM Analytics capture pipeline in production environments, the test suite can be configured to run against live PostHog instances with real AWS S3 storage.
### Configuration Requirements
#### PostHog Credentials
- **Project API Key**: PostHog project private API key for authentication
- **PostHog URL**: PostHog instance URL (cloud or self-hosted)
- **Project ID**: PostHog project identifier for query API access
#### AWS S3 Credentials
- **AWS Access Key ID**: Limited IAM user with read-only S3 access
- **AWS Secret Access Key**: Corresponding secret key
- **S3 Bucket Name**: Production S3 bucket name
- **Region**: AWS region for S3 bucket
### IAM Policy for S3 Read Access
The following IAM policy provides minimal read-only access for a specific team prefix:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectMetadata",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::your-llma-bucket/llma/TEAM_ID/*",
"arn:aws:s3:::your-llma-bucket"
],
"Condition": {
"StringLike": {
"s3:prefix": "llma/TEAM_ID/*"
}
}
}
]
}
```
### AWS CLI Script for S3 Key Generation
A separate script (`generate-s3-test-keys.sh`) will be implemented to generate limited S3 read-only credentials for LLMA testing. The script will create IAM users with team-specific permissions and output the necessary environment variables for testing.
### Production Test Configuration
#### Environment Variables
```bash
# PostHog Configuration
export POSTHOG_PROJECT_API_KEY="your_posthog_api_key"
export POSTHOG_HOST="https://app.posthog.com" # or your self-hosted URL
export POSTHOG_PROJECT_ID="12345"
# AWS S3 Configuration
export AWS_ACCESS_KEY_ID="your_limited_access_key"
export AWS_SECRET_ACCESS_KEY="your_limited_secret_key"
export AWS_DEFAULT_REGION="us-east-1"
export LLMA_S3_BUCKET="your-llma-bucket"
export LLMA_TEAM_ID="123"
# Test Configuration
export LLMA_TEST_MODE="production"
```
### Production Test Execution
#### Safety Measures
- **Read-Only Operations**: Production tests only read data, never write or modify
- **Team Isolation**: Tests only access data for the specified team ID
- **Rate Limiting**: Production tests include delays to avoid overwhelming services
- **Data Validation**: Verify S3 objects exist and are accessible without downloading large payloads
#### Usage Example
```bash
# Generate S3 test credentials (script to be implemented)
./generate-s3-test-keys.sh 123 posthog-llm-analytics
# Configure environment
source production-test.env
# Run production validation tests
pytest tests/integration/production/ -v --tb=short
```

View File

@@ -0,0 +1,476 @@
use axum::body::Bytes;
use axum::extract::State;
use axum::http::HeaderMap;
use axum::response::Json;
use flate2::read::GzDecoder;
use futures::stream;
use multer::{parse_boundary, Multipart};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashSet;
use std::io::Read;
use tracing::{debug, warn};
use crate::api::{CaptureError, CaptureResponse, CaptureResponseCode};
use crate::router::State as AppState;
use crate::token::validate_token;
#[derive(Debug, Serialize, Deserialize)]
pub struct PartInfo {
pub name: String,
pub length: usize,
#[serde(rename = "content-type")]
pub content_type: Option<String>,
#[serde(rename = "content-encoding")]
pub content_encoding: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AIEndpointResponse {
pub accepted_parts: Vec<PartInfo>,
}
pub async fn ai_handler(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Result<Json<AIEndpointResponse>, CaptureError> {
debug!("Received request to /i/v0/ai endpoint");
// Check for empty body
if body.is_empty() {
warn!("AI endpoint received empty body");
return Err(CaptureError::EmptyPayload);
}
// Note: Request body size limit is enforced by Axum's DefaultBodyLimit layer
// (110% of ai_max_sum_of_parts_bytes to account for multipart overhead)
// Check for Content-Encoding header and decompress if needed
let content_encoding = headers
.get("content-encoding")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let decompressed_body = if content_encoding.eq_ignore_ascii_case("gzip") {
debug!("Decompressing gzip-encoded request body");
decompress_gzip(&body)?
} else {
body
};
// Check content type - must be multipart/form-data
let content_type = headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !content_type.starts_with("multipart/form-data") {
warn!(
"AI endpoint received non-multipart content type: {}",
content_type
);
return Err(CaptureError::RequestDecodingError(
"Content-Type must be multipart/form-data".to_string(),
));
}
// Extract boundary from Content-Type header using multer's built-in parser
let boundary = parse_boundary(content_type).map_err(|e| {
warn!("Failed to parse boundary from Content-Type: {}", e);
CaptureError::RequestDecodingError(format!("Invalid boundary in Content-Type: {e}"))
})?;
// Check for authentication
let auth_header = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !auth_header.starts_with("Bearer ") {
warn!("AI endpoint missing or invalid Authorization header");
return Err(CaptureError::NoTokenError);
}
// Extract and validate token
let token = &auth_header[7..]; // Remove "Bearer " prefix
validate_token(token)?;
// Parse multipart data and collect part information
let accepted_parts = parse_multipart_data(
&decompressed_body,
&boundary,
state.ai_max_sum_of_parts_bytes,
)
.await?;
// Log request details for debugging
debug!("AI endpoint request validated and parsed successfully");
debug!("Body size: {} bytes", decompressed_body.len());
debug!("Content-Type: {}", content_type);
debug!("Boundary: {}", boundary);
debug!("Token: {}...", &token[..std::cmp::min(8, token.len())]);
debug!("Accepted parts: {}", accepted_parts.len());
// TODO: Process AI events and upload to S3
// For now, return the accepted parts information
let response = AIEndpointResponse { accepted_parts };
Ok(Json(response))
}
pub async fn options() -> Result<CaptureResponse, CaptureError> {
Ok(CaptureResponse {
status: CaptureResponseCode::Ok,
quota_limited: None,
})
}
/// Decompress gzip-encoded body using streaming decompression
fn decompress_gzip(compressed: &Bytes) -> Result<Bytes, CaptureError> {
let mut decoder = GzDecoder::new(&compressed[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).map_err(|e| {
warn!("Failed to decompress gzip body: {}", e);
CaptureError::RequestDecodingError(format!("Failed to decompress gzip body: {e}"))
})?;
debug!(
"Decompressed {} bytes to {} bytes",
compressed.len(),
decompressed.len()
);
Ok(Bytes::from(decompressed))
}
/// Validate blob part content type
fn is_valid_blob_content_type(content_type: &str) -> bool {
// Supported content types for blob parts
content_type == "application/octet-stream"
|| content_type == "application/json"
|| content_type == "text/plain"
|| content_type.starts_with("text/plain;") // Allow text/plain with charset
}
/// Parse multipart data and validate structure
async fn parse_multipart_data(
body: &[u8],
boundary: &str,
max_sum_of_parts_bytes: usize,
) -> Result<Vec<PartInfo>, CaptureError> {
// Size limits
const MAX_EVENT_SIZE: usize = 32 * 1024; // 32KB
const MAX_COMBINED_SIZE: usize = 1024 * 1024 - 64 * 1024; // 1MB - 64KB = 960KB
// Create a stream from the body data - need to own the data
let body_owned = body.to_vec();
let body_stream = stream::once(async move { Ok::<Vec<u8>, std::io::Error>(body_owned) });
// Create multipart parser
let mut multipart = Multipart::new(body_stream, boundary);
let mut part_count = 0;
let mut has_event_part = false;
let mut first_part_processed = false;
let mut accepted_parts = Vec::new();
let mut seen_property_names = HashSet::new();
let mut event_json: Option<Value> = None;
let mut properties_json: Option<Value> = None;
let mut event_size: usize = 0;
let mut properties_size: usize = 0;
let mut sum_of_parts_bytes: usize = 0;
// Parse each part
while let Some(field) = multipart.next_field().await.map_err(|e| {
warn!("Multipart parsing error: {}", e);
CaptureError::RequestDecodingError(format!("Multipart parsing failed: {e}"))
})? {
part_count += 1;
// Extract all field information before consuming the field
let field_name = field.name().unwrap_or("unknown").to_string();
let content_type = field.content_type().map(|ct| ct.to_string());
let content_encoding = field
.headers()
.get("content-encoding")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
debug!(
"Processing multipart field: {} (part #{})",
field_name, part_count
);
// Check if this is the first part
if !first_part_processed {
first_part_processed = true;
// Validate that the first part is the event part
if field_name != "event" {
return Err(CaptureError::RequestDecodingError(format!(
"First part must be 'event', got '{field_name}'"
)));
}
debug!("First part is 'event' as expected");
}
// Read the field data to get the length (this consumes the field)
let field_data = field.bytes().await.map_err(|e| {
warn!("Failed to read field data for '{}': {}", field_name, e);
CaptureError::RequestDecodingError(format!("Failed to read field data: {e}"))
})?;
// Track sum of all part sizes
sum_of_parts_bytes += field_data.len();
// Check if this is the event JSON part
if field_name == "event" {
has_event_part = true;
event_size = field_data.len();
// Check event size limit
if event_size > MAX_EVENT_SIZE {
return Err(CaptureError::EventTooBig(format!(
"Event part size ({event_size} bytes) exceeds maximum allowed size ({MAX_EVENT_SIZE} bytes)"
)));
}
// Parse the event JSON (without validating properties yet)
let event_json_str = std::str::from_utf8(&field_data).map_err(|e| {
warn!("Event part is not valid UTF-8: {}", e);
CaptureError::RequestDecodingError("Event part must be valid UTF-8".to_string())
})?;
event_json = Some(serde_json::from_str(event_json_str).map_err(|e| {
warn!("Event part is not valid JSON: {}", e);
CaptureError::RequestDecodingError("Event part must be valid JSON".to_string())
})?);
debug!("Event part parsed successfully");
} else if field_name == "event.properties" {
properties_size = field_data.len();
// Parse the properties JSON
let properties_json_str = std::str::from_utf8(&field_data).map_err(|e| {
warn!("Properties part is not valid UTF-8: {}", e);
CaptureError::RequestDecodingError(
"Properties part must be valid UTF-8".to_string(),
)
})?;
properties_json = Some(serde_json::from_str(properties_json_str).map_err(|e| {
warn!("Properties part is not valid JSON: {}", e);
CaptureError::RequestDecodingError("Properties part must be valid JSON".to_string())
})?);
debug!("Properties part parsed successfully");
} else if field_name.starts_with("event.properties.") {
// This is a blob part - check for duplicates
if !seen_property_names.insert(field_name.clone()) {
return Err(CaptureError::RequestDecodingError(format!(
"Duplicate blob property: {field_name}"
)));
}
// Validate content type for blob parts - it's required
match &content_type {
Some(ref ct) => {
let ct_lower = ct.to_lowercase();
if !is_valid_blob_content_type(&ct_lower) {
return Err(CaptureError::RequestDecodingError(
format!("Unsupported content type for blob part '{field_name}': '{ct}'. Supported types: application/octet-stream, application/json, text/plain"),
));
}
}
None => {
return Err(CaptureError::RequestDecodingError(format!(
"Missing required Content-Type header for blob part '{field_name}'"
)));
}
}
debug!("Blob part '{}' processed successfully", field_name);
} else {
warn!("Unknown multipart field: {}", field_name);
}
// Create and store part info after all validation
let part_info = PartInfo {
name: field_name.clone(),
length: field_data.len(),
content_type,
content_encoding,
};
accepted_parts.push(part_info);
}
// Validate that we have at least the event part
if !has_event_part {
return Err(CaptureError::RequestDecodingError(
"Missing required 'event' part in multipart data".to_string(),
));
}
// Check combined size limit
let combined_size = event_size + properties_size;
if combined_size > MAX_COMBINED_SIZE {
return Err(CaptureError::EventTooBig(format!(
"Combined event and properties size ({combined_size} bytes) exceeds maximum allowed size ({MAX_COMBINED_SIZE} bytes)"
)));
}
// Check sum of all parts limit
if sum_of_parts_bytes > max_sum_of_parts_bytes {
return Err(CaptureError::EventTooBig(format!(
"Sum of all parts ({sum_of_parts_bytes} bytes) exceeds maximum allowed size ({max_sum_of_parts_bytes} bytes)"
)));
}
// Merge properties into the event
let mut event = event_json.unwrap();
// Check for conflicting properties sources
let has_embedded_properties = event
.as_object()
.and_then(|obj| obj.get("properties"))
.is_some();
if has_embedded_properties && properties_json.is_some() {
return Err(CaptureError::RequestDecodingError(
"Event cannot have both embedded properties and a separate 'event.properties' part"
.to_string(),
));
}
// Determine which properties to use:
// - If there's a separate event.properties part, use it
// - If there's no separate part, use embedded properties from the event (if any)
// - If neither exists, use empty object
let properties = if let Some(props) = properties_json {
props
} else {
// No separate part - check for embedded properties
if let Some(event_obj) = event.as_object() {
event_obj
.get("properties")
.cloned()
.unwrap_or(serde_json::json!({}))
} else {
serde_json::json!({})
}
};
// Insert/replace properties in the event object
if let Some(event_obj) = event.as_object_mut() {
event_obj.insert("properties".to_string(), properties);
} else {
return Err(CaptureError::RequestDecodingError(
"Event must be a JSON object".to_string(),
));
}
// Now validate the complete event structure
validate_event_structure(&event)?;
debug!(
"Multipart parsing completed: {} parts processed",
part_count
);
Ok(accepted_parts)
}
/// Validate the structure and content of an AI event
fn validate_event_structure(event: &Value) -> Result<(), CaptureError> {
// Check if event is an object
let event_obj = event.as_object().ok_or_else(|| {
warn!("Event must be a JSON object");
CaptureError::RequestDecodingError("Event must be a JSON object".to_string())
})?;
// Validate event name
let event_name = event_obj
.get("event")
.and_then(|v| v.as_str())
.ok_or_else(|| {
warn!("Event missing 'event' field");
CaptureError::RequestDecodingError("Event missing 'event' field".to_string())
})?;
if event_name.is_empty() {
return Err(CaptureError::RequestDecodingError(
"Event name cannot be empty".to_string(),
));
}
// Only accept specific AI event types
const ALLOWED_AI_EVENTS: [&str; 6] = [
"$ai_generation",
"$ai_trace",
"$ai_span",
"$ai_embedding",
"$ai_metric",
"$ai_feedback",
];
if !ALLOWED_AI_EVENTS.contains(&event_name) {
return Err(CaptureError::RequestDecodingError(format!(
"Event name must be one of: {}, got '{}'",
ALLOWED_AI_EVENTS.join(", "),
event_name
)));
}
// Validate distinct_id
let distinct_id = event_obj
.get("distinct_id")
.and_then(|v| v.as_str())
.ok_or_else(|| {
warn!("Event missing 'distinct_id' field");
CaptureError::RequestDecodingError("Event missing 'distinct_id' field".to_string())
})?;
if distinct_id.is_empty() {
return Err(CaptureError::RequestDecodingError(
"distinct_id cannot be empty".to_string(),
));
}
// Validate properties object
let properties = event_obj
.get("properties")
.and_then(|v| v.as_object())
.ok_or_else(|| {
warn!("Event missing 'properties' field");
CaptureError::RequestDecodingError("Event missing 'properties' field".to_string())
})?;
// Validate required AI properties
if !properties.contains_key("$ai_model") {
return Err(CaptureError::RequestDecodingError(
"Event properties must contain '$ai_model'".to_string(),
));
}
let ai_model = properties
.get("$ai_model")
.and_then(|v| v.as_str())
.ok_or_else(|| {
warn!("$ai_model must be a string");
CaptureError::RequestDecodingError("$ai_model must be a string".to_string())
})?;
if ai_model.is_empty() {
return Err(CaptureError::RequestDecodingError(
"$ai_model cannot be empty".to_string(),
));
}
debug!(
"Event validation passed: event='{}', distinct_id='{}', ai_model='{}'",
event_name, distinct_id, ai_model
);
Ok(())
}

View File

@@ -105,6 +105,10 @@ pub struct Config {
// deploy var [0.0..100.0] to sample behavior of interest for verbose logging
#[envconfig(default = "0.0")]
pub verbose_sample_percent: f32,
// AI endpoint size limits
#[envconfig(default = "26214400")] // 25MB in bytes
pub ai_max_sum_of_parts_bytes: usize,
}
#[derive(Envconfig, Clone)]

View File

@@ -1,3 +1,4 @@
pub mod ai_endpoint;
pub mod api;
pub mod config;
pub mod limiters;

View File

@@ -15,7 +15,7 @@ use tower_http::trace::TraceLayer;
use crate::metrics_middleware::track_metrics;
use crate::test_endpoint;
use crate::{sinks, time::TimeSource, v0_endpoint};
use crate::{ai_endpoint, sinks, time::TimeSource, v0_endpoint};
use common_redis::Client;
use limiters::token_dropper::TokenDropper;
@@ -39,6 +39,7 @@ pub struct State {
pub capture_mode: CaptureMode,
pub is_mirror_deploy: bool,
pub verbose_sample_percent: f32,
pub ai_max_sum_of_parts_bytes: usize,
}
#[derive(Clone)]
@@ -114,6 +115,7 @@ pub fn router<
historical_tokens_keys: Option<String>,
is_mirror_deploy: bool,
verbose_sample_percent: f32,
ai_max_sum_of_parts_bytes: usize,
) -> Router {
let state = State {
sink: Arc::new(sink),
@@ -130,6 +132,7 @@ pub fn router<
capture_mode: capture_mode.clone(),
is_mirror_deploy,
verbose_sample_percent,
ai_max_sum_of_parts_bytes,
};
// Very permissive CORS policy, as old SDK versions
@@ -253,11 +256,26 @@ pub fn router<
)
.layer(DefaultBodyLimit::max(RECORDING_BODY_SIZE));
// AI endpoint body limit is 110% of max sum of parts to account for multipart overhead
let ai_body_limit = (state.ai_max_sum_of_parts_bytes as f64 * 1.1) as usize;
let ai_router = Router::new()
.route(
"/i/v0/ai",
post(ai_endpoint::ai_handler).options(ai_endpoint::options),
)
.route(
"/i/v0/ai/",
post(ai_endpoint::ai_handler).options(ai_endpoint::options),
)
.layer(DefaultBodyLimit::max(ai_body_limit));
let mut router = match capture_mode {
CaptureMode::Events => Router::new()
.merge(batch_router)
.merge(event_router)
.merge(test_router),
.merge(test_router)
.merge(ai_router),
CaptureMode::Recordings => Router::new().merge(recordings_router),
};

View File

@@ -185,6 +185,7 @@ where
config.historical_tokens_keys,
config.is_mirror_deploy,
config.verbose_sample_percent,
config.ai_max_sum_of_parts_bytes,
);
// run our app with hyper

View File

@@ -994,6 +994,7 @@ fn setup_capture_router(unit: &TestCase) -> (Router, MemorySink) {
historical_tokens_keys,
is_mirror_deploy,
verbose_sample_percent,
26_214_400, // 25MB default for AI endpoint
),
sink,
)

View File

@@ -80,6 +80,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
s3_fallback_endpoint: None,
s3_fallback_prefix: String::new(),
healthcheck_strategy: HealthStrategy::All,
ai_max_sum_of_parts_bytes: 26_214_400, // 25MB default
});
static TRACING_INIT: Once = Once::new();

File diff suppressed because it is too large Load Diff

View File

@@ -139,6 +139,7 @@ async fn setup_router_with_limits(
None, // historical_tokens_keys
false, // is_mirror_deploy
0.0, // verbose_sample_percent
26_214_400, // ai_max_sum_of_parts_bytes (25MB)
);
(app, sink)
@@ -1182,6 +1183,7 @@ async fn test_survey_quota_cross_batch_first_submission_allowed() {
None,
false,
0.0,
26_214_400,
);
let client = TestClient::new(app);
@@ -1257,6 +1259,7 @@ async fn test_survey_quota_cross_batch_duplicate_submission_dropped() {
None,
false,
0.0,
26_214_400,
);
let client = TestClient::new(app);
@@ -1336,6 +1339,7 @@ async fn test_survey_quota_cross_batch_redis_error_fail_open() {
None,
false,
0.0,
26_214_400,
);
let client = TestClient::new(app);
@@ -1752,6 +1756,7 @@ async fn test_ai_quota_cross_batch_redis_error_fail_open() {
None,
false,
0.0,
26_214_400,
);
let client = TestClient::new(app);