feat: add NamedQuery model and endpoint (#38163)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Paweł Szczur
2025-09-18 20:49:51 +02:00
committed by GitHub
parent c55dfca318
commit 0119057ede
13 changed files with 983 additions and 16 deletions

View File

@@ -16442,6 +16442,51 @@
"enum": ["cohort", "person", "event", "event_metadata", "group", "session", "hogql", "revenue_analytics"],
"type": "string"
},
"NamedQueryRequest": {
"additionalProperties": false,
"properties": {
"description": {
"type": "string"
},
"is_active": {
"type": "boolean"
},
"name": {
"type": "string"
},
"query": {
"$ref": "#/definitions/HogQLQuery"
}
},
"type": "object"
},
"NamedQueryRunRequest": {
"additionalProperties": false,
"properties": {
"client_query_id": {
"description": "Client provided query ID. Can be used to retrieve the status or cancel the query.",
"type": "string"
},
"filters_override": {
"$ref": "#/definitions/DashboardFilter"
},
"refresh": {
"$ref": "#/definitions/RefreshType",
"default": "blocking",
"description": "Whether results should be calculated sync or async, and how much to rely on the cache:\n- `'blocking'` - calculate synchronously (returning only when the query is done), UNLESS there are very fresh results in the cache\n- `'async'` - kick off background calculation (returning immediately with a query status), UNLESS there are very fresh results in the cache\n- `'lazy_async'` - kick off background calculation, UNLESS there are somewhat fresh results in the cache\n- `'force_blocking'` - calculate synchronously, even if fresh results are already cached\n- `'force_async'` - kick off background calculation, even if fresh results are already cached\n- `'force_cache'` - return cached data or a cache miss; always completes immediately as it never calculates Background calculation can be tracked using the `query_status` response field."
},
"variables_override": {
"additionalProperties": {
"type": "object"
},
"type": "object"
},
"variables_values": {
"type": "object"
}
},
"type": "object"
},
"NewExperimentQueryResponse": {
"additionalProperties": false,
"properties": {

View File

@@ -1518,6 +1518,35 @@ export type RefreshType =
| 'force_cache'
| 'lazy_async'
export interface NamedQueryRequest {
name?: string
description?: string
query?: HogQLQuery
is_active?: boolean
}
export interface NamedQueryRunRequest {
/** Client provided query ID. Can be used to retrieve the status or cancel the query. */
client_query_id?: string
// Sync the `refresh` description here with the two instances in posthog/api/insight.py
/**
* Whether results should be calculated sync or async, and how much to rely on the cache:
* - `'blocking'` - calculate synchronously (returning only when the query is done), UNLESS there are very fresh results in the cache
* - `'async'` - kick off background calculation (returning immediately with a query status), UNLESS there are very fresh results in the cache
* - `'lazy_async'` - kick off background calculation, UNLESS there are somewhat fresh results in the cache
* - `'force_blocking'` - calculate synchronously, even if fresh results are already cached
* - `'force_async'` - kick off background calculation, even if fresh results are already cached
* - `'force_cache'` - return cached data or a cache miss; always completes immediately as it never calculates
* Background calculation can be tracked using the `query_status` response field.
* @default 'blocking'
*/
refresh?: RefreshType
filters_override?: DashboardFilter
variables_override?: Record<string, Record<string, any>>
variables_values?: Record<string, any>
}
export interface QueryRequest {
/** Client provided query ID. Can be used to retrieve the status or cancel the query. */
client_query_id?: string

View File

@@ -1,7 +1,7 @@
from rest_framework import decorators, exceptions, viewsets
from rest_framework_extensions.routers import NestedRegistryItem
from posthog.api import data_color_theme, hog_flow, metalytics, my_notifications, project
from posthog.api import data_color_theme, hog_flow, metalytics, my_notifications, named_query, project
from posthog.api.batch_imports import BatchImportViewSet
from posthog.api.csp_reporting import CSPReportingViewSet
from posthog.api.routing import DefaultRouterPlusPlus
@@ -349,6 +349,13 @@ projects_router.register(
)
register_grandfathered_environment_nested_viewset(r"query", query.QueryViewSet, "environment_query", ["team_id"])
register_grandfathered_environment_nested_viewset(
r"named_query",
named_query.NamedQueryViewSet,
"environment_named_query",
["team_id"],
)
# External data resources
register_grandfathered_environment_nested_viewset(
r"external_data_sources",
@@ -520,7 +527,6 @@ register_grandfathered_environment_nested_viewset(
["team_id"],
)
register_grandfathered_environment_nested_viewset(r"heatmaps", HeatmapViewSet, "environment_heatmaps", ["team_id"])
register_grandfathered_environment_nested_viewset(r"sessions", SessionViewSet, "environment_sessions", ["team_id"])
@@ -577,7 +583,6 @@ else:
register_grandfathered_environment_nested_viewset(r"persons", PersonViewSet, "environment_persons", ["team_id"])
router.register(r"person", LegacyPersonViewSet, "persons")
environment_dashboards_router.register(
r"sharing",
sharing.SharingConfigurationViewSet,

280
posthog/api/named_query.py Normal file
View File

@@ -0,0 +1,280 @@
import re
import typing
from django.core.cache import cache
from django.shortcuts import get_object_or_404
from django_filters.rest_framework import DjangoFilterBackend
from pydantic import BaseModel
from rest_framework import status, viewsets
from rest_framework.exceptions import Throttled, ValidationError
from rest_framework.request import Request
from rest_framework.response import Response
from posthog.schema import HogQLQuery, NamedQueryRequest, NamedQueryRunRequest, QueryRequest
from posthog.hogql.errors import ExposedHogQLError, ResolutionError
from posthog.api.documentation import extend_schema
from posthog.api.mixins import PydanticModelMixin
from posthog.api.query import _process_query_request
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.api.services.query import process_query_model
from posthog.api.utils import action
from posthog.clickhouse.client.limit import ConcurrencyLimitExceeded
from posthog.clickhouse.query_tagging import get_query_tag_value, tag_queries
from posthog.constants import AvailableFeature
from posthog.errors import ExposedCHQueryError
from posthog.exceptions_capture import capture_exception
from posthog.hogql_queries.query_runner import BLOCKING_EXECUTION_MODES
from posthog.models import User
from posthog.models.named_query import NamedQuery
from posthog.rate_limit import APIQueriesBurstThrottle, APIQueriesSustainedThrottle
from posthog.schema_migrations.upgrade import upgrade
from common.hogvm.python.utils import HogVMException
class NamedQueryViewSet(TeamAndOrgViewSetMixin, PydanticModelMixin, viewsets.ModelViewSet):
# NOTE: Do we need to override the scopes for the "create"
scope_object = "named_query"
# Special case for query - these are all essentially read actions
scope_object_read_actions = ["retrieve", "create", "list", "destroy", "update", "run"]
scope_object_write_actions: list[str] = []
lookup_field = "name"
queryset = NamedQuery.objects.all()
filter_backends = [DjangoFilterBackend]
filterset_fields = ["is_active", "created_by"]
def get_serializer_class(self):
return None # We use Pydantic models instead
def get_throttles(self):
return [APIQueriesBurstThrottle(), APIQueriesSustainedThrottle()]
def check_team_api_queries_concurrency(self):
cache_key = f"team/{self.team_id}/feature/{AvailableFeature.API_QUERIES_CONCURRENCY}"
cached = cache.get(cache_key)
if cached is not None:
return cached
if self.team:
new_val = self.team.organization.is_feature_available(AvailableFeature.API_QUERIES_CONCURRENCY)
cache.set(cache_key, new_val)
return new_val
return False
def list(self, request: Request, *args, **kwargs) -> Response:
"""List all named queries for the team."""
queryset = self.filter_queryset(self.get_queryset())
results = []
for named_query in queryset:
results.append(
{
"id": str(named_query.id),
"name": named_query.name,
"description": named_query.description,
"query": named_query.query,
"parameters": named_query.parameters,
"is_active": named_query.is_active,
"endpoint_path": named_query.endpoint_path,
"created_at": named_query.created_at,
"updated_at": named_query.updated_at,
}
)
return Response({"results": results})
def validate_request(self, data: NamedQueryRequest, strict: bool = True) -> None:
query = data.query
if query:
if query.kind != "HogQLQuery":
raise ValidationError("Only HogQLQuery query kind is supported (speak to us)")
elif strict:
raise ValidationError("Must specify query")
name = data.name
if not name:
if name is not None or strict:
raise ValidationError("Named query must have a name.")
return
if not isinstance(name, str) or not re.fullmatch(r"^[a-zA-Z0-9_-]{1,128}$", name):
raise ValidationError(
"Named query name must be alphanumeric characters, hyphens, or underscores, "
"and be between 1 and 128 characters long."
)
@extend_schema(
request=NamedQueryRequest,
description="Create a new named query",
)
def create(self, request: Request, *args, **kwargs) -> Response:
"""Create a new named query."""
upgraded_query = upgrade(request.data)
data = self.get_model(upgraded_query, NamedQueryRequest)
self.validate_request(data, strict=True)
try:
named_query = NamedQuery.objects.create(
team=self.team,
created_by=typing.cast(User, request.user),
name=typing.cast(str, data.name), # verified in validate_request
query=typing.cast(HogQLQuery, data.query).model_dump(),
description=data.description or "",
is_active=data.is_active if data.is_active is not None else True,
)
return Response(
{
"id": str(named_query.id),
"name": named_query.name,
"description": named_query.description,
"query": named_query.query,
"parameters": named_query.parameters,
"is_active": named_query.is_active,
"endpoint_path": named_query.endpoint_path,
"created_at": named_query.created_at,
"updated_at": named_query.updated_at,
},
status=status.HTTP_201_CREATED,
)
except Exception as e:
capture_exception(e)
raise ValidationError("Failed to create named query.")
@extend_schema(
request=NamedQueryRequest,
description="Update an existing named query. Parameters are optional.",
)
def update(self, request: Request, name=None, *args, **kwargs) -> Response:
"""Update an existing named query."""
named_query = get_object_or_404(NamedQuery, team=self.team, name=name)
upgraded_query = upgrade(request.data)
data = self.get_model(upgraded_query, NamedQueryRequest)
self.validate_request(data, strict=False)
try:
if data.name is not None:
named_query.name = data.name
if data.query is not None:
named_query.query = data.query.model_dump()
if data.description is not None:
named_query.description = data.description
if data.is_active is not None:
named_query.is_active = data.is_active
named_query.save()
return Response(
{
"id": str(named_query.id),
"name": named_query.name,
"description": named_query.description,
"query": named_query.query,
"parameters": named_query.parameters,
"is_active": named_query.is_active,
"endpoint_path": named_query.endpoint_path,
"created_at": named_query.created_at,
"updated_at": named_query.updated_at,
}
)
except Exception as e:
capture_exception(e)
raise ValidationError("Failed to update named query.")
def destroy(self, request: Request, name=None, *args, **kwargs) -> Response:
"""Delete a named query."""
named_query = get_object_or_404(NamedQuery, team=self.team, name=name)
named_query.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
@extend_schema(
request=NamedQueryRunRequest,
description="Update an existing named query. Parameters are optional.",
)
@action(methods=["GET", "POST"], detail=True)
def run(self, request: Request, name=None, *args, **kwargs) -> Response:
"""Execute a named query with optional parameters."""
named_query = get_object_or_404(NamedQuery, team=self.team, name=name, is_active=True)
data = self.get_model(request.data, NamedQueryRunRequest)
data.variables_values = data.variables_values or {}
try:
query_variables = named_query.query.get("variables", {})
for code_name, value in data.variables_values.items():
for variable in query_variables.values():
if variable.get("code_name", "") == code_name:
variable["value"] = value
# Build QueryRequest
query_request_data = {
"client_query_id": data.client_query_id,
"filters_override": data.filters_override,
"name": named_query.name,
"refresh": data.refresh, # Allow overriding QueryRequest fields like refresh, client_query_id
"query": named_query.query,
"variables_override": data.variables_override,
}
merged_data = self.get_model(query_request_data, QueryRequest)
query, client_query_id, execution_mode = _process_query_request(
merged_data, self.team, data.client_query_id, request.user
)
self._tag_client_query_id(client_query_id)
if execution_mode not in BLOCKING_EXECUTION_MODES:
raise ValidationError("only sync modes are supported (refresh param)")
result = process_query_model(
self.team,
query,
execution_mode=execution_mode,
query_id=client_query_id,
user=typing.cast(User, request.user),
is_query_service=(get_query_tag_value("access_method") == "personal_api_key"),
)
if isinstance(result, BaseModel):
result = result.model_dump(by_alias=True)
response_status = (
status.HTTP_202_ACCEPTED
if result.get("query_status") and result["query_status"].get("complete") is False
else status.HTTP_200_OK
)
return Response(result, status=response_status)
except (ExposedHogQLError, ExposedCHQueryError, HogVMException) as e:
raise ValidationError(str(e), getattr(e, "code_name", None))
except ResolutionError as e:
raise ValidationError(str(e))
except ConcurrencyLimitExceeded as c:
raise Throttled(detail=str(c))
except Exception as e:
self.handle_column_ch_error(e)
capture_exception(e)
raise
def handle_column_ch_error(self, error):
if getattr(error, "message", None):
match = re.search(r"There's no column.*in table", error.message)
if match:
# TODO: remove once we support all column types
raise ValidationError(
match.group(0) + ". Note: While in beta, not all column types may be fully supported"
)
return
def _tag_client_query_id(self, query_id: str | None):
if query_id is None:
return
tag_queries(client_query_id=query_id)
MAX_QUERY_TIMEOUT = 600

View File

@@ -1,6 +1,4 @@
import re
import uuid
from concurrent.futures import ThreadPoolExecutor
from django.core.cache import cache
from django.http import JsonResponse, StreamingHttpResponse
@@ -43,6 +41,7 @@ from posthog.hogql_queries.apply_dashboard_filters import apply_dashboard_filter
from posthog.hogql_queries.hogql_query_runner import HogQLQueryRunner
from posthog.hogql_queries.query_runner import ExecutionMode, execution_mode_from_refresh
from posthog.models.user import User
from posthog.models.utils import uuid7
from posthog.rate_limit import (
AIBurstRateThrottle,
AISustainedRateThrottle,
@@ -56,14 +55,6 @@ from posthog.schema_migrations.upgrade import upgrade
from common.hogvm.python.utils import HogVMException
# Create a dedicated thread pool for query processing
# Setting max_workers to ensure we don't overwhelm the system
# while still allowing concurrent queries
QUERY_EXECUTOR = ThreadPoolExecutor(
max_workers=50, # 50 should be enough to have 200 simultaneous queries across clickhouse
thread_name_prefix="query_processor",
)
def _process_query_request(
request_data: QueryRequest, team, client_query_id: str | None = None, user=None
@@ -77,7 +68,7 @@ def _process_query_request(
if request_data.variables_override is not None:
query = apply_dashboard_variables(query, request_data.variables_override, team)
query_id = client_query_id or uuid.uuid4().hex
query_id = client_query_id or uuid7().hex
execution_mode = execution_mode_from_refresh(request_data.refresh)
if request_data.async_: # TODO: Legacy async, use "refresh=async" instead

View File

@@ -66,6 +66,8 @@
'/home/runner/work/posthog/posthog/posthog/api/insight.py: Warning [EnterpriseInsightsViewSet > InsightSerializer]: unable to resolve type hint for function "get_result". Consider using a type hint or @extend_schema_field. Defaulting to string.',
'/home/runner/work/posthog/posthog/posthog/api/insight.py: Warning [EnterpriseInsightsViewSet > InsightSerializer]: unable to resolve type hint for function "get_timezone". Consider using a type hint or @extend_schema_field. Defaulting to string.',
'/home/runner/work/posthog/posthog/posthog/api/insight.py: Warning [EnterpriseInsightsViewSet > InsightSerializer]: unable to resolve type hint for function "get_types". Consider using a type hint or @extend_schema_field. Defaulting to string.',
"/home/runner/work/posthog/posthog/posthog/api/named_query.py: Error [NamedQueryViewSet]: exception raised while getting serializer. Hint: Is get_serializer_class() returning None or is get_queryset() not working without a request? Ignoring the view for now. (Exception: 'NoneType' object is not callable)",
'/home/runner/work/posthog/posthog/posthog/api/named_query.py: Warning [NamedQueryViewSet]: could not derive type of path parameter "project_id" because model "posthog.models.named_query.NamedQuery" contained no such field. Consider annotating parameter with @extend_schema. Defaulting to "string".',
'/home/runner/work/posthog/posthog/posthog/api/notebook.py: Warning [NotebookViewSet]: could not derive type of path parameter "project_id" because model "posthog.models.notebook.notebook.Notebook" contained no such field. Consider annotating parameter with @extend_schema. Defaulting to "string".',
'/home/runner/work/posthog/posthog/posthog/api/organization.py: Warning [OrganizationViewSet > OrganizationSerializer]: unable to resolve type hint for function "get_member_count". Consider using a type hint or @extend_schema_field. Defaulting to string.',
'/home/runner/work/posthog/posthog/posthog/api/organization.py: Warning [OrganizationViewSet > OrganizationSerializer]: unable to resolve type hint for function "get_metadata". Consider using a type hint or @extend_schema_field. Defaulting to string.',
@@ -148,11 +150,13 @@
'Warning: operationId "destroy" has collisions [(\'/api/organizations/{id}/\', \'delete\'), (\'/api/organizations/{organization_id}/projects/{id}/\', \'delete\')]. resolving with numeral suffixes.',
'Warning: operationId "environments_app_metrics_historical_exports_retrieve" has collisions [(\'/api/environments/{project_id}/app_metrics/{plugin_config_id}/historical_exports/\', \'get\'), (\'/api/environments/{project_id}/app_metrics/{plugin_config_id}/historical_exports/{id}/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "environments_insights_activity_retrieve" has collisions [(\'/api/environments/{project_id}/insights/{id}/activity/\', \'get\'), (\'/api/environments/{project_id}/insights/activity/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "environments_named_query_retrieve" has collisions [(\'/api/environments/{project_id}/named_query/\', \'get\'), (\'/api/environments/{project_id}/named_query/{name}/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "environments_persons_activity_retrieve" has collisions [(\'/api/environments/{project_id}/persons/{id}/activity/\', \'get\'), (\'/api/environments/{project_id}/persons/activity/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "event_definitions_retrieve" has collisions [(\'/api/projects/{project_id}/event_definitions/\', \'get\'), (\'/api/projects/{project_id}/event_definitions/{id}/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "feature_flags_activity_retrieve" has collisions [(\'/api/projects/{project_id}/feature_flags/{id}/activity/\', \'get\'), (\'/api/projects/{project_id}/feature_flags/activity/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "insights_activity_retrieve" has collisions [(\'/api/projects/{project_id}/insights/{id}/activity/\', \'get\'), (\'/api/projects/{project_id}/insights/activity/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "list" has collisions [(\'/api/organizations/\', \'get\'), (\'/api/organizations/{organization_id}/projects/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "named_query_retrieve" has collisions [(\'/api/projects/{project_id}/named_query/\', \'get\'), (\'/api/projects/{project_id}/named_query/{name}/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "notebooks_activity_retrieve" has collisions [(\'/api/projects/{project_id}/notebooks/{short_id}/activity/\', \'get\'), (\'/api/projects/{project_id}/notebooks/activity/\', \'get\')]. resolving with numeral suffixes.',
'Warning: operationId "partial_update" has collisions [(\'/api/organizations/{id}/\', \'patch\'), (\'/api/organizations/{organization_id}/projects/{id}/\', \'patch\')]. resolving with numeral suffixes.',
'Warning: operationId "persons_activity_retrieve" has collisions [(\'/api/projects/{project_id}/persons/{id}/activity/\', \'get\'), (\'/api/projects/{project_id}/persons/activity/\', \'get\')]. resolving with numeral suffixes.',

View File

@@ -0,0 +1,408 @@
from typing import Any
from posthog.test.base import APIBaseTest, ClickhouseTestMixin
from rest_framework import status
from posthog.models.insight_variable import InsightVariable
from posthog.models.named_query import NamedQuery
from posthog.models.team import Team
from posthog.models.user import User
class TestNamedQuery(ClickhouseTestMixin, APIBaseTest):
ENDPOINT = "named_query"
def setUp(self):
super().setUp()
self.sample_query = {
"explain": None,
"filters": None,
"kind": "HogQLQuery",
"modifiers": None,
"name": None,
"query": "SELECT count(1) FROM query_log",
"response": None,
"tags": None,
"values": None,
"variables": None,
"version": None,
}
def test_create_named_query(self):
"""Test creating a named query successfully."""
data = {
"name": "test_query",
"description": "Test query description",
"query": self.sample_query,
}
response = self.client.post(f"/api/environments/{self.team.id}/named_query/", data, format="json")
self.assertEqual(status.HTTP_201_CREATED, response.status_code, response.json())
response_data = response.json()
self.assertEqual("test_query", response_data["name"])
self.assertEqual(self.sample_query, response_data["query"])
self.assertEqual("Test query description", response_data["description"])
self.assertTrue(response_data["is_active"])
self.assertIn("id", response_data)
self.assertIn("endpoint_path", response_data)
self.assertIn("created_at", response_data)
self.assertIn("updated_at", response_data)
# Verify it was saved to database
named_query = NamedQuery.objects.get(name="test_query", team=self.team)
self.assertEqual(named_query.query, self.sample_query)
self.assertEqual(named_query.created_by, self.user)
def test_update_named_query(self):
"""Test updating an existing named query."""
# Create initial query
named_query = NamedQuery.objects.create(
name="update_test",
team=self.team,
query=self.sample_query,
description="Original description",
created_by=self.user,
)
# Update it
updated_data = {
"description": "Updated description",
"is_active": False,
"query": {"kind": "HogQLQuery", "query": "SELECT 1"},
}
response = self.client.put(
f"/api/environments/{self.team.id}/named_query/{named_query.name}/", updated_data, format="json"
)
response_data = response.json()
self.assertEqual(status.HTTP_200_OK, response.status_code, response_data)
self.assertEqual("update_test", response_data["name"])
self.assertEqual("Updated description", response_data["description"])
self.assertFalse(response_data["is_active"])
want_query = {
"explain": None,
"filters": None,
"kind": "HogQLQuery",
"modifiers": None,
"name": None,
"query": "SELECT 1",
"response": None,
"tags": None,
"values": None,
"variables": None,
"version": None,
}
self.assertEqual(want_query, response_data["query"])
# Verify database was updated
named_query.refresh_from_db()
self.assertEqual(named_query.description, "Updated description")
self.assertFalse(named_query.is_active)
def test_delete_named_query(self):
"""Test deleting a named query."""
# Create query to delete
NamedQuery.objects.create(
name="delete_test",
team=self.team,
query=self.sample_query,
created_by=self.user,
)
response = self.client.delete(f"/api/environments/{self.team.id}/named_query/delete_test/")
self.assertIn(response.status_code, [status.HTTP_204_NO_CONTENT, status.HTTP_200_OK])
def test_execute_named_query(self):
"""Test executing a named query successfully."""
# Create a simple query
NamedQuery.objects.create(
name="execute_test",
team=self.team,
query={"kind": "HogQLQuery", "query": "SELECT 1 as result"},
created_by=self.user,
is_active=True,
)
response = self.client.get(f"/api/environments/{self.team.id}/named_query/execute_test/run/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
response_data = response.json()
# Verify response structure (should match query response format)
self.assertIn("results", response_data)
self.assertIsInstance(response_data["results"], list)
def test_execute_inactive_query(self):
"""Test that inactive queries cannot be executed."""
# Create an inactive query
NamedQuery.objects.create(
name="inactive_test",
team=self.team,
query=self.sample_query,
created_by=self.user,
is_active=False,
)
response = self.client.get(f"/api/environments/{self.team.id}/named_query/inactive_test/run/")
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_invalid_query_name_validation(self):
"""Test validation of invalid query names."""
# Test invalid characters
data = {
"name": "invalid@name!",
"query": self.sample_query,
}
response = self.client.post(f"/api/environments/{self.team.id}/named_query/", data, format="json")
self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)
def test_missing_required_fields(self):
"""Test validation when required fields are missing."""
# Missing name
data: dict[str, Any] = {"query": self.sample_query}
response = self.client.post(f"/api/environments/{self.team.id}/named_query/", data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# Missing query
data = {"name": "test_query"}
response = self.client.post(f"/api/environments/{self.team.id}/named_query/", data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_duplicate_name_in_team(self):
"""Test that duplicate names within the same team are not allowed."""
# Create first query
NamedQuery.objects.create(
name="duplicate_test",
team=self.team,
query=self.sample_query,
created_by=self.user,
)
# Try to create another with same name
data = {
"name": "duplicate_test",
"query": {"kind": "HogQLQuery", "query": "SELECT 2"},
}
response = self.client.post(f"/api/environments/{self.team.id}/named_query/", data, format="json")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_team_isolation(self):
"""Test that queries are properly isolated between teams."""
# Create another team and user
other_team = Team.objects.create(organization=self.organization, name="Other Team")
other_user = User.objects.create_and_join(self.organization, "other@test.com", None)
# Create query in other team
NamedQuery.objects.create(
name="other_team_query",
team=other_team,
query=self.sample_query,
created_by=other_user,
)
# Try to access it from current team - should return 404
response = self.client.get(f"/api/environments/{self.team.id}/named_query/other_team_query/run/")
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_execute_query_with_invalid_sql(self):
"""Test error handling when executing query with invalid SQL."""
# Create query with invalid SQL
NamedQuery.objects.create(
name="invalid_sql_test",
team=self.team,
query={"kind": "HogQLQuery", "query": "SELECT FROM invalid_syntax"},
created_by=self.user,
is_active=True,
)
response = self.client.get(f"/api/environments/{self.team.id}/named_query/invalid_sql_test/run/")
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn("detail", response.json())
def test_execute_query_with_variables(self):
"""Test executing a named query with variables."""
# Create an insight variable first
variable = InsightVariable.objects.create(
team=self.team,
name="From Date",
code_name="from_date",
type=InsightVariable.Type.DATE,
default_value="2025-01-01",
)
# Create a query with variables
query_with_variables = {
"kind": "HogQLQuery",
"query": "select * from events where toDate(timestamp) > {variables.from_date} limit 1",
"variables": {
str(variable.id): {"variableId": str(variable.id), "code_name": "from_date", "value": "2025-01-01"}
},
}
NamedQuery.objects.create(
name="query_with_variables",
team=self.team,
query=query_with_variables,
created_by=self.user,
is_active=True,
)
# Execute with variable values
request_data = {"variables_values": {"from_date": "2025-09-18"}}
response = self.client.post(
f"/api/environments/{self.team.id}/named_query/query_with_variables/run/", request_data, format="json"
)
response_data = response.json()
self.assertEqual(response.status_code, status.HTTP_200_OK, response_data)
self.assertIn("results", response_data)
def test_list_filter_by_is_active(self):
"""Test filtering named queries by is_active status."""
# Create active and inactive queries
NamedQuery.objects.create(
name="active_query",
team=self.team,
query=self.sample_query,
created_by=self.user,
is_active=True,
)
NamedQuery.objects.create(
name="inactive_query",
team=self.team,
query=self.sample_query,
created_by=self.user,
is_active=False,
)
# Test filtering for active queries
response = self.client.get(f"/api/environments/{self.team.id}/named_query/?is_active=true")
self.assertEqual(response.status_code, status.HTTP_200_OK)
response_data = response.json()
self.assertEqual(len(response_data["results"]), 1)
self.assertEqual(response_data["results"][0]["name"], "active_query")
self.assertTrue(response_data["results"][0]["is_active"])
# Test filtering for inactive queries
response = self.client.get(f"/api/environments/{self.team.id}/named_query/?is_active=false")
self.assertEqual(response.status_code, status.HTTP_200_OK)
response_data = response.json()
self.assertEqual(len(response_data["results"]), 1)
self.assertEqual(response_data["results"][0]["name"], "inactive_query")
self.assertFalse(response_data["results"][0]["is_active"])
def test_list_filter_by_created_by(self):
"""Test filtering named queries by created_by user."""
# Create another user
other_user = User.objects.create_and_join(self.organization, "other@test.com", None)
# Create queries by different users
NamedQuery.objects.create(
name="query_by_user1",
team=self.team,
query=self.sample_query,
created_by=self.user,
)
NamedQuery.objects.create(
name="query_by_user2",
team=self.team,
query=self.sample_query,
created_by=other_user,
)
# Test filtering by first user
response = self.client.get(f"/api/environments/{self.team.id}/named_query/?created_by={self.user.id}")
self.assertEqual(response.status_code, status.HTTP_200_OK)
response_data = response.json()
self.assertEqual(len(response_data["results"]), 1)
self.assertEqual(response_data["results"][0]["name"], "query_by_user1")
# Test filtering by second user
response = self.client.get(f"/api/environments/{self.team.id}/named_query/?created_by={other_user.id}")
self.assertEqual(response.status_code, status.HTTP_200_OK)
response_data = response.json()
self.assertEqual(len(response_data["results"]), 1)
self.assertEqual(response_data["results"][0]["name"], "query_by_user2")
def test_list_filter_combined(self):
"""Test filtering named queries by both is_active and created_by."""
# Create another user
other_user = User.objects.create_and_join(self.organization, "other@test.com", None)
# Create queries with different combinations
NamedQuery.objects.create(
name="active_query_user1",
team=self.team,
query=self.sample_query,
created_by=self.user,
is_active=True,
)
NamedQuery.objects.create(
name="inactive_query_user1",
team=self.team,
query=self.sample_query,
created_by=self.user,
is_active=False,
)
NamedQuery.objects.create(
name="active_query_user2",
team=self.team,
query=self.sample_query,
created_by=other_user,
is_active=True,
)
# Test combined filtering
response = self.client.get(
f"/api/environments/{self.team.id}/named_query/?is_active=true&created_by={self.user.id}"
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
response_data = response.json()
self.assertEqual(len(response_data["results"]), 1)
self.assertEqual(response_data["results"][0]["name"], "active_query_user1")
self.assertTrue(response_data["results"][0]["is_active"])
def test_list_no_filters(self):
"""Test listing all named queries without filters."""
# Create multiple queries
NamedQuery.objects.create(
name="query1",
team=self.team,
query=self.sample_query,
created_by=self.user,
is_active=True,
)
NamedQuery.objects.create(
name="query2",
team=self.team,
query=self.sample_query,
created_by=self.user,
is_active=False,
)
# Test without any filters - should return all queries
response = self.client.get(f"/api/environments/{self.team.id}/named_query/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
response_data = response.json()
self.assertEqual(len(response_data["results"]), 2)
query_names = {q["name"] for q in response_data["results"]}
self.assertEqual(query_names, {"query1", "query2"})

View File

@@ -114,7 +114,7 @@ class ExecutionMode(StrEnum):
"""Do not initiate calculation."""
BLOCKING_EXECUTION_MODES = {
BLOCKING_EXECUTION_MODES: set[ExecutionMode] = {
ExecutionMode.CALCULATE_BLOCKING_ALWAYS,
ExecutionMode.RECENT_CACHE_CALCULATE_BLOCKING_IF_STALE,
ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS,

View File

@@ -0,0 +1,70 @@
# Generated by Django 4.2.22 on 2025-09-18 16:20
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
import posthog.models.utils
import posthog.models.named_query
class Migration(migrations.Migration):
dependencies = [
("posthog", "0859_alter_team_session_recording_retention_period"),
]
operations = [
migrations.CreateModel(
name="NamedQuery",
fields=[
(
"id",
models.UUIDField(
default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False
),
),
(
"name",
models.CharField(
help_text="URL-safe name for the query endpoint",
max_length=128,
validators=[posthog.models.named_query.validate_query_name],
),
),
("query", models.JSONField(help_text="Query definition following QueryRequest.query schema")),
(
"description",
models.TextField(blank=True, help_text="Human-readable description of what this query does"),
),
(
"parameters",
models.JSONField(
blank=True,
default=dict,
help_text="JSON schema defining expected parameters for query customization",
),
),
(
"is_active",
models.BooleanField(default=True, help_text="Whether this named query is available via the API"),
),
("created_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
(
"created_by",
models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL),
),
("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team")),
],
options={
"indexes": [
models.Index(fields=["team", "is_active"], name="posthog_nam_team_id_44cb0e_idx"),
models.Index(fields=["team", "name"], name="posthog_nam_team_id_410f3f_idx"),
],
},
),
migrations.AddConstraint(
model_name="namedquery",
constraint=models.UniqueConstraint(fields=("team", "name"), name="unique_team_named_query_name"),
),
]

View File

@@ -1 +1 @@
0859_alter_team_session_recording_retention_period
0860_add_namedquery

View File

@@ -0,0 +1,98 @@
import re
from typing import Any
from django.core.exceptions import ValidationError
from django.db import models
from posthog.models.team import Team
from posthog.models.user import User
from posthog.models.utils import CreatedMetaFields, UpdatedMetaFields, UUIDTModel
def validate_query_name(value: str) -> None:
"""Validate that the query name is URL-safe and follows naming conventions."""
if not re.match(r"^[a-zA-Z][a-zA-Z0-9_-]*$", value):
raise ValidationError(
f"{value} is not a valid query name. Query names must start with a letter and contain only letters, numbers, hyphens, and underscores.",
params={"value": value},
)
if len(value) > 128:
raise ValidationError(
f"Query name '{value}' is too long. Maximum length is 128 characters.",
params={"value": value},
)
class NamedQuery(CreatedMetaFields, UpdatedMetaFields, UUIDTModel):
"""Model for storing named queries that can be accessed via API endpoints.
Named queries allow creating reusable query endpoints like:
/api/environments/{team_id}/named_query/{query_name}
The query field follows the same structure as QueryRequest.query, supporting
any query type accepted by the /query endpoint (HogQLQuery, TrendsQuery, etc.).
"""
name = models.CharField(
max_length=128, validators=[validate_query_name], help_text="URL-safe name for the query endpoint"
)
team = models.ForeignKey(Team, on_delete=models.CASCADE)
# Use JSONField to store the query, following the same pattern as QueryRequest.query
# This can store any of the query types: HogQLQuery, TrendsQuery, FunnelsQuery, etc.
query = models.JSONField(help_text="Query definition following QueryRequest.query schema")
description = models.TextField(blank=True, help_text="Human-readable description of what this query does")
# Parameter schema for query customization
parameters = models.JSONField(
default=dict, blank=True, help_text="JSON schema defining expected parameters for query customization"
)
is_active = models.BooleanField(default=True, help_text="Whether this named query is available via the API")
created_by = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["team", "name"],
name="unique_team_named_query_name",
)
]
indexes = [
models.Index(fields=["team", "is_active"]),
models.Index(fields=["team", "name"]),
]
def __str__(self) -> str:
return f"{self.team.name}: {self.name}"
@property
def endpoint_path(self) -> str:
"""Return the API endpoint path for this named query."""
# return reverse("delete_named_query", {"team_id": self.team.id, "query_name": self.name})
return f"/api/environments/{self.team.id}/named_query/d/{self.name}"
def get_query_with_parameters(self, request_params: dict[str, Any]) -> dict[str, Any]:
"""Apply request parameters to the stored query.
This method handles parameter injection for query customization.
For now, it returns the query as-is, but can be extended to support
parameter substitution in the future.
"""
# TODO: Implement parameter substitution logic
# For example, replacing {parameter_name} placeholders in HogQL queries
return self.query
def validate_parameters(self, request_params: dict[str, Any]) -> None:
"""Validate request parameters against the parameter schema.
This method can be extended to implement JSON schema validation
of incoming parameters.
"""
# TODO: Implement parameter validation logic
pass

View File

@@ -9002,6 +9002,32 @@ class MultipleBreakdownOptions(BaseModel):
values: list[BreakdownItem]
class NamedQueryRunRequest(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
client_query_id: Optional[str] = Field(
default=None, description="Client provided query ID. Can be used to retrieve the status or cancel the query."
)
filters_override: Optional[DashboardFilter] = None
refresh: Optional[RefreshType] = Field(
default=RefreshType.BLOCKING,
description=(
"Whether results should be calculated sync or async, and how much to rely on the cache:\n- `'blocking'` -"
" calculate synchronously (returning only when the query is done), UNLESS there are very fresh results in"
" the cache\n- `'async'` - kick off background calculation (returning immediately with a query status),"
" UNLESS there are very fresh results in the cache\n- `'lazy_async'` - kick off background calculation,"
" UNLESS there are somewhat fresh results in the cache\n- `'force_blocking'` - calculate synchronously,"
" even if fresh results are already cached\n- `'force_async'` - kick off background calculation, even if"
" fresh results are already cached\n- `'force_cache'` - return cached data or a cache miss; always"
" completes immediately as it never calculates Background calculation can be tracked using the"
" `query_status` response field."
),
)
variables_override: Optional[dict[str, dict[str, Any]]] = None
variables_values: Optional[dict[str, Any]] = None
class PathsQueryResponse(BaseModel):
model_config = ConfigDict(
extra="forbid",
@@ -11899,6 +11925,16 @@ class MaxRecordingUniversalFilters(BaseModel):
)
class NamedQueryRequest(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
description: Optional[str] = None
is_active: Optional[bool] = None
name: Optional[str] = None
query: Optional[HogQLQuery] = None
class PropertyGroupFilter(BaseModel):
model_config = ConfigDict(
extra="forbid",

View File

@@ -29,6 +29,7 @@ APIScopeObject = Literal[
"hog_function",
"insight",
"link",
"named_query",
"notebook",
"organization",
"organization_member",