mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(max): Postgres snapshots for evals (#36562)
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
@@ -7,6 +7,7 @@ load_from:
|
||||
# - python_module: dags.locations.revenue_analytics
|
||||
# - python_module: dags.locations.shared
|
||||
# These dags are loaded in local dev
|
||||
- python_module: dags.locations.max_ai
|
||||
- python_module: dags.locations.web_analytics
|
||||
- python_module: dags.locations.experiments
|
||||
- python_module: dags.locations.clickhouse
|
||||
|
||||
8
.vscode/launch.json
vendored
8
.vscode/launch.json
vendored
@@ -266,11 +266,13 @@
|
||||
"request": "launch",
|
||||
"module": "dagster",
|
||||
"justMyCode": true,
|
||||
"args": ["dev"],
|
||||
"subProcess": true,
|
||||
"envFile": "${workspaceFolder}/.env",
|
||||
"args": ["dev", "--workspace", "${workspaceFolder}/.dagster_home/workspace.yaml"],
|
||||
"env": {
|
||||
"DAGSTER_HOME": "${workspaceFolder}/.dagster_home",
|
||||
"DEBUG": "1",
|
||||
"AWS_PROFILE": "dev"
|
||||
"DAGSTER_UI_PORT": "3030",
|
||||
"DEBUG": "1"
|
||||
}
|
||||
}
|
||||
],
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from contextlib import suppress
|
||||
from enum import Enum
|
||||
|
||||
from clickhouse_driver.errors import Error, ErrorCodes
|
||||
import dagster
|
||||
from clickhouse_driver.errors import Error, ErrorCodes
|
||||
|
||||
from posthog.clickhouse import query_tagging
|
||||
from posthog.clickhouse.cluster import (
|
||||
@@ -22,6 +22,7 @@ class JobOwners(str, Enum):
|
||||
TEAM_ERROR_TRACKING = "team-error-tracking"
|
||||
TEAM_GROWTH = "team-growth"
|
||||
TEAM_EXPERIMENTS = "team-experiments"
|
||||
TEAM_MAX_AI = "team-max-ai"
|
||||
|
||||
|
||||
class ClickhouseClusterResource(dagster.ConfigurableResource):
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import dagster
|
||||
import dagster_slack
|
||||
|
||||
from dagster_aws.s3.io_manager import s3_pickle_io_manager
|
||||
from dagster_aws.s3.resources import S3Resource
|
||||
from django.conf import settings
|
||||
@@ -22,7 +21,11 @@ resources_by_env = {
|
||||
"cluster": ClickhouseClusterResource.configure_at_launch(),
|
||||
"io_manager": dagster.fs_io_manager,
|
||||
"slack": dagster.ResourceDefinition.none_resource(description="Dummy Slack resource for local development"),
|
||||
"s3": S3Resource(),
|
||||
"s3": S3Resource(
|
||||
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
|
||||
aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
10
dags/locations/max_ai.py
Normal file
10
dags/locations/max_ai.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import dagster
|
||||
|
||||
from dags.max_ai.snapshot_project_data import snapshot_project_data
|
||||
|
||||
from . import resources
|
||||
|
||||
defs = dagster.Definitions(
|
||||
jobs=[snapshot_project_data],
|
||||
resources=resources,
|
||||
)
|
||||
82
dags/max_ai/snapshot_project_data.py
Normal file
82
dags/max_ai/snapshot_project_data.py
Normal file
@@ -0,0 +1,82 @@
|
||||
from typing import TypeVar
|
||||
|
||||
import dagster
|
||||
from dagster_aws.s3.resources import S3Resource
|
||||
|
||||
from dags.common import JobOwners
|
||||
from dags.max_ai.utils import check_dump_exists, compose_postgres_dump_path, dump_model
|
||||
from ee.hogai.eval.schema import (
|
||||
BaseSnapshot,
|
||||
DataWarehouseTableSnapshot,
|
||||
GroupTypeMappingSnapshot,
|
||||
PostgresProjectDataSnapshot,
|
||||
PropertyDefinitionSnapshot,
|
||||
TeamSnapshot,
|
||||
)
|
||||
|
||||
DEFAULT_RETRY_POLICY = dagster.RetryPolicy(
|
||||
max_retries=4,
|
||||
delay=2, # 2 seconds
|
||||
backoff=dagster.Backoff.EXPONENTIAL,
|
||||
jitter=dagster.Jitter.PLUS_MINUS,
|
||||
)
|
||||
|
||||
|
||||
SchemaBound = TypeVar("SchemaBound", bound=BaseSnapshot)
|
||||
|
||||
|
||||
def snapshot_postgres_model(
|
||||
context: dagster.OpExecutionContext,
|
||||
model_type: type[SchemaBound],
|
||||
file_name: str,
|
||||
s3: S3Resource,
|
||||
project_id: int,
|
||||
code_version: str | None = None,
|
||||
) -> str:
|
||||
file_key = compose_postgres_dump_path(project_id, file_name, code_version)
|
||||
if check_dump_exists(s3, file_key):
|
||||
context.log.info(f"Skipping {file_key} because it already exists")
|
||||
return file_key
|
||||
context.log.info(f"Dumping {file_key}")
|
||||
with dump_model(s3=s3, schema=model_type, file_key=file_key) as dump:
|
||||
dump(model_type.serialize_for_project(project_id))
|
||||
return file_key
|
||||
|
||||
|
||||
@dagster.op(
|
||||
description="Snapshots Postgres project data (property definitions, DWH schema, etc.)",
|
||||
retry_policy=DEFAULT_RETRY_POLICY,
|
||||
code_version="v1",
|
||||
tags={"owner": JobOwners.TEAM_MAX_AI.value},
|
||||
)
|
||||
def snapshot_postgres_project_data(
|
||||
context: dagster.OpExecutionContext, project_id: int, s3: S3Resource
|
||||
) -> PostgresProjectDataSnapshot:
|
||||
context.log.info(f"Snapshotting Postgres project data for {project_id}")
|
||||
snapshot_map: dict[str, type[BaseSnapshot]] = {
|
||||
"project": TeamSnapshot,
|
||||
"property_definitions": PropertyDefinitionSnapshot,
|
||||
"group_type_mappings": GroupTypeMappingSnapshot,
|
||||
"data_warehouse_tables": DataWarehouseTableSnapshot,
|
||||
}
|
||||
deps = {
|
||||
file_name: snapshot_postgres_model(context, model_type, file_name, s3, project_id, context.op_def.version)
|
||||
for file_name, model_type in snapshot_map.items()
|
||||
}
|
||||
context.log_event(
|
||||
dagster.AssetMaterialization(
|
||||
asset_key="project_postgres_snapshot",
|
||||
description="Avro snapshots of project Postgres data",
|
||||
metadata={"project_id": project_id, **deps},
|
||||
tags={"owner": JobOwners.TEAM_MAX_AI.value},
|
||||
)
|
||||
)
|
||||
return PostgresProjectDataSnapshot(**deps)
|
||||
|
||||
|
||||
@dagster.job(
|
||||
tags={"owner": JobOwners.TEAM_MAX_AI.value},
|
||||
)
|
||||
def snapshot_project_data():
|
||||
# Temporary job for testing
|
||||
snapshot_postgres_project_data()
|
||||
89
dags/max_ai/utils.py
Normal file
89
dags/max_ai/utils.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import hashlib
|
||||
from collections.abc import Sequence
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from tempfile import TemporaryFile
|
||||
|
||||
import botocore
|
||||
from dagster_aws.s3 import S3Resource
|
||||
from django.conf import settings
|
||||
from fastavro import parse_schema, writer
|
||||
from pydantic_avro import AvroBase
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
|
||||
EVALS_S3_PREFIX = "ai_evals"
|
||||
|
||||
# objectstorage has only the default bucket in debug.
|
||||
if settings.DEBUG:
|
||||
EVALS_S3_BUCKET = settings.OBJECT_STORAGE_BUCKET
|
||||
else:
|
||||
EVALS_S3_BUCKET = settings.DAGSTER_AI_EVALS_S3_BUCKET
|
||||
|
||||
|
||||
def get_consistent_hash_suffix(file_name: str, date: datetime | None = None, code_version: str | None = None) -> str:
|
||||
"""
|
||||
Generate a consistent hash suffix that updates twice per month based on the filename.
|
||||
|
||||
The hash changes on the 1st and 15th of each month, ensuring links update
|
||||
twice monthly while remaining consistent within each period.
|
||||
|
||||
Args:
|
||||
file_name: The base filename to hash
|
||||
date: Optional date for testing, defaults to current date
|
||||
code_version: Optional code version for hash consistency
|
||||
|
||||
Returns:
|
||||
A short hash string (8 characters) that's consistent within each half-month period
|
||||
"""
|
||||
if date is None:
|
||||
date = datetime.now()
|
||||
|
||||
# Determine which half of the month we're in
|
||||
half_month_period = 1 if date.day < 15 else 2
|
||||
|
||||
# Create a seed that changes twice per month
|
||||
period_seed = f"{date.year}-{date.month:02d}-{half_month_period}"
|
||||
|
||||
# Combine the period seed with the filename for consistent hashing
|
||||
hash_input = f"{period_seed}:{file_name}"
|
||||
if code_version:
|
||||
hash_input += f":{code_version}"
|
||||
|
||||
# Generate a short, URL-safe hash
|
||||
hash_obj = hashlib.sha256(hash_input.encode("utf-8"))
|
||||
return hash_obj.hexdigest()[:8]
|
||||
|
||||
|
||||
def compose_postgres_dump_path(project_id: int, dir_name: str, code_version: str | None = None) -> str:
|
||||
"""Compose S3 path for Postgres dumps with consistent hashing"""
|
||||
hash_suffix = get_consistent_hash_suffix(dir_name, code_version=code_version)
|
||||
return f"{EVALS_S3_PREFIX}/postgres_models/{project_id}/{dir_name}/{hash_suffix}.avro"
|
||||
|
||||
|
||||
def check_dump_exists(s3: S3Resource, file_key: str) -> bool:
|
||||
"""Check if a file exists in S3"""
|
||||
try:
|
||||
s3.get_client().head_object(Bucket=EVALS_S3_BUCKET, Key=file_key)
|
||||
return True
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response["Error"]["Code"] == "404":
|
||||
return False
|
||||
raise
|
||||
|
||||
|
||||
@contextmanager
|
||||
def dump_model(*, s3: S3Resource, schema: type[AvroBase], file_key: str):
|
||||
with TemporaryFile() as f:
|
||||
parsed_schema = parse_schema(schema.avro_schema())
|
||||
|
||||
def dump(models: Sequence[AvroBase]):
|
||||
writer(f, parsed_schema, (model.model_dump() for model in models))
|
||||
|
||||
yield dump
|
||||
|
||||
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=4))
|
||||
def upload():
|
||||
f.seek(0)
|
||||
s3.get_client().upload_fileobj(f, EVALS_S3_BUCKET, file_key)
|
||||
|
||||
upload()
|
||||
0
dags/tests/max_ai/__init__.py
Normal file
0
dags/tests/max_ai/__init__.py
Normal file
132
dags/tests/max_ai/test_snapshot_project_data.py
Normal file
132
dags/tests/max_ai/test_snapshot_project_data.py
Normal file
@@ -0,0 +1,132 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import dagster
|
||||
import pytest
|
||||
from dagster import OpExecutionContext
|
||||
from dagster_aws.s3.resources import S3Resource
|
||||
|
||||
from dags.max_ai.snapshot_project_data import (
|
||||
snapshot_postgres_model,
|
||||
snapshot_postgres_project_data,
|
||||
)
|
||||
from ee.hogai.eval.schema import PostgresProjectDataSnapshot, TeamSnapshot
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_context():
|
||||
context = MagicMock(spec=OpExecutionContext)
|
||||
context.log = MagicMock()
|
||||
return context
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_s3():
|
||||
mock = MagicMock(spec=S3Resource)
|
||||
mock.get_resource_definition = S3Resource().get_resource_definition
|
||||
return mock
|
||||
|
||||
|
||||
@patch("dags.max_ai.snapshot_project_data.compose_postgres_dump_path")
|
||||
@patch("dags.max_ai.snapshot_project_data.check_dump_exists")
|
||||
def test_snapshot_postgres_model_skips_when_file_exists(
|
||||
mock_check_dump_exists, mock_compose_path, mock_context, mock_s3
|
||||
):
|
||||
"""Test that snapshot_postgres_model skips dumping when file already exists."""
|
||||
# Setup
|
||||
file_key = "test/path/teams_abc123.avro"
|
||||
mock_compose_path.return_value = file_key
|
||||
mock_check_dump_exists.return_value = True
|
||||
|
||||
project_id = 123
|
||||
file_name = "teams"
|
||||
code_version = "v1"
|
||||
|
||||
# Execute
|
||||
result = snapshot_postgres_model(
|
||||
context=mock_context,
|
||||
model_type=TeamSnapshot,
|
||||
file_name=file_name,
|
||||
s3=mock_s3,
|
||||
project_id=project_id,
|
||||
code_version=code_version,
|
||||
)
|
||||
|
||||
# Verify
|
||||
assert result == file_key
|
||||
mock_compose_path.assert_called_once_with(project_id, file_name, code_version)
|
||||
mock_check_dump_exists.assert_called_once_with(mock_s3, file_key)
|
||||
mock_context.log.info.assert_called_once_with(f"Skipping {file_key} because it already exists")
|
||||
|
||||
|
||||
@patch("dags.max_ai.snapshot_project_data.compose_postgres_dump_path")
|
||||
@patch("dags.max_ai.snapshot_project_data.check_dump_exists")
|
||||
@patch("dags.max_ai.snapshot_project_data.dump_model")
|
||||
def test_snapshot_postgres_model_dumps_when_file_not_exists(
|
||||
mock_dump_model, mock_check_dump_exists, mock_compose_path, mock_context, mock_s3
|
||||
):
|
||||
"""Test that snapshot_postgres_model dumps data when file doesn't exist."""
|
||||
# Setup
|
||||
file_key = "test/path/teams_abc123.avro"
|
||||
mock_compose_path.return_value = file_key
|
||||
mock_check_dump_exists.return_value = False
|
||||
|
||||
# Mock the context manager and dump function
|
||||
mock_dump_context = MagicMock()
|
||||
mock_dump_function = MagicMock()
|
||||
mock_dump_context.__enter__ = MagicMock(return_value=mock_dump_function)
|
||||
mock_dump_context.__exit__ = MagicMock(return_value=None)
|
||||
mock_dump_model.return_value = mock_dump_context
|
||||
|
||||
# Mock the serialize_for_project method
|
||||
mock_serialized_data = [{"id": 1, "name": "test"}]
|
||||
with patch.object(TeamSnapshot, "serialize_for_project", return_value=mock_serialized_data):
|
||||
project_id = 123
|
||||
file_name = "teams"
|
||||
code_version = "v1"
|
||||
|
||||
# Execute
|
||||
result = snapshot_postgres_model(
|
||||
context=mock_context,
|
||||
model_type=TeamSnapshot,
|
||||
file_name=file_name,
|
||||
s3=mock_s3,
|
||||
project_id=project_id,
|
||||
code_version=code_version,
|
||||
)
|
||||
|
||||
# Verify
|
||||
assert result == file_key
|
||||
mock_compose_path.assert_called_once_with(project_id, file_name, code_version)
|
||||
mock_check_dump_exists.assert_called_once_with(mock_s3, file_key)
|
||||
mock_context.log.info.assert_called_with(f"Dumping {file_key}")
|
||||
mock_dump_model.assert_called_once_with(s3=mock_s3, schema=TeamSnapshot, file_key=file_key)
|
||||
mock_dump_function.assert_called_once_with(mock_serialized_data)
|
||||
|
||||
|
||||
@patch("dags.max_ai.snapshot_project_data.snapshot_postgres_model")
|
||||
def test_snapshot_postgres_project_data_exports_all_models(mock_snapshot_postgres_model, mock_s3):
|
||||
"""Test that snapshot_postgres_project_data exports all expected models."""
|
||||
# Setup
|
||||
project_id = 456
|
||||
mock_snapshot_postgres_model.side_effect = [
|
||||
"path/to/project.avro",
|
||||
"path/to/property_definitions.avro",
|
||||
"path/to/group_type_mappings.avro",
|
||||
"path/to/data_warehouse_tables.avro",
|
||||
]
|
||||
|
||||
# Create context using Dagster's build_op_context
|
||||
context = dagster.build_op_context()
|
||||
|
||||
# Execute
|
||||
result = snapshot_postgres_project_data(context, project_id, mock_s3)
|
||||
|
||||
# Verify all expected models are in the result
|
||||
assert isinstance(result, PostgresProjectDataSnapshot)
|
||||
assert result.project == "path/to/project.avro"
|
||||
assert result.property_definitions == "path/to/property_definitions.avro"
|
||||
assert result.group_type_mappings == "path/to/group_type_mappings.avro"
|
||||
assert result.data_warehouse_tables == "path/to/data_warehouse_tables.avro"
|
||||
|
||||
# Verify snapshot_postgres_model was called for each model type
|
||||
assert mock_snapshot_postgres_model.call_count == 4
|
||||
170
dags/tests/max_ai/test_utils.py
Normal file
170
dags/tests/max_ai/test_utils.py
Normal file
@@ -0,0 +1,170 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, cast
|
||||
|
||||
import pytest
|
||||
from dagster_aws.s3 import S3Resource
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
from fastavro import reader
|
||||
from pydantic_avro import AvroBase
|
||||
|
||||
from dags.max_ai.utils import (
|
||||
EVALS_S3_BUCKET,
|
||||
EVALS_S3_PREFIX,
|
||||
check_dump_exists,
|
||||
compose_postgres_dump_path,
|
||||
dump_model,
|
||||
get_consistent_hash_suffix,
|
||||
)
|
||||
|
||||
|
||||
def test_consistent_hash_suffix_same_period():
|
||||
"""Test that hash is consistent within the same half-month period."""
|
||||
file_name = "test_file.txt"
|
||||
|
||||
# Test first half of month (1st-14th)
|
||||
date_1st = datetime(2024, 1, 1)
|
||||
date_14th = datetime(2024, 1, 14)
|
||||
|
||||
hash_1st = get_consistent_hash_suffix(file_name, date_1st)
|
||||
hash_14th = get_consistent_hash_suffix(file_name, date_14th)
|
||||
|
||||
assert hash_1st == hash_14th, "Hash should be consistent within first half of month"
|
||||
|
||||
# Test second half of month (15th-31st)
|
||||
date_15th = datetime(2024, 1, 15)
|
||||
date_31st = datetime(2024, 1, 31)
|
||||
|
||||
hash_15th = get_consistent_hash_suffix(file_name, date_15th)
|
||||
hash_31st = get_consistent_hash_suffix(file_name, date_31st)
|
||||
|
||||
assert hash_15th == hash_31st, "Hash should be consistent within second half of month"
|
||||
|
||||
|
||||
def test_consistent_hash_suffix_different_periods():
|
||||
"""Test that hash changes between different half-month periods."""
|
||||
file_name = "test_file.txt"
|
||||
|
||||
# Test boundary between first and second half
|
||||
date_14th = datetime(2024, 1, 14)
|
||||
date_15th = datetime(2024, 1, 15)
|
||||
|
||||
hash_14th = get_consistent_hash_suffix(file_name, date_14th)
|
||||
hash_15th = get_consistent_hash_suffix(file_name, date_15th)
|
||||
|
||||
assert hash_14th != hash_15th, "Hash should change between first and second half of month"
|
||||
|
||||
# Test boundary between months
|
||||
date_jan_31st = datetime(2024, 1, 31)
|
||||
date_feb_1st = datetime(2024, 2, 1)
|
||||
|
||||
hash_jan_31st = get_consistent_hash_suffix(file_name, date_jan_31st)
|
||||
hash_feb_1st = get_consistent_hash_suffix(file_name, date_feb_1st)
|
||||
|
||||
assert hash_jan_31st != hash_feb_1st, "Hash should change between different months"
|
||||
|
||||
|
||||
def test_consistent_hash_suffix_with_code_version():
|
||||
"""Test that code version affects the hash."""
|
||||
file_name = "test_file.txt"
|
||||
date = datetime(2024, 1, 1)
|
||||
|
||||
hash_no_version = get_consistent_hash_suffix(file_name, date)
|
||||
hash_with_version = get_consistent_hash_suffix(file_name, date, "v1.0")
|
||||
hash_different_version = get_consistent_hash_suffix(file_name, date, "v2.0")
|
||||
|
||||
assert hash_no_version != hash_with_version, "Hash should differ when code version is added"
|
||||
assert hash_with_version != hash_different_version, "Hash should differ for different code versions"
|
||||
|
||||
|
||||
def test_hash_format():
|
||||
"""Test that hash returns expected format (8 characters)."""
|
||||
file_name = "test_file.txt"
|
||||
date = datetime(2024, 1, 1)
|
||||
|
||||
hash_result = get_consistent_hash_suffix(file_name, date)
|
||||
|
||||
assert len(hash_result) == 8, "Hash should be 8 characters long"
|
||||
assert hash_result.isalnum(), "Hash should be alphanumeric"
|
||||
|
||||
|
||||
def test_compose_postgres_dump_path():
|
||||
"""Test that compose_postgres_dump_path generates correct S3 path with hash."""
|
||||
project_id = 123
|
||||
dir_name = "test_dump"
|
||||
code_version = "v1.0"
|
||||
|
||||
with override_settings(DAGSTER_AI_EVALS_S3_BUCKET="test-bucket"):
|
||||
result = compose_postgres_dump_path(project_id, dir_name, code_version)
|
||||
|
||||
# Should contain the project ID in path
|
||||
assert f"/{project_id}/" in result
|
||||
|
||||
# Should start with the mocked folder path
|
||||
assert result.startswith(f"{EVALS_S3_PREFIX}/postgres_models/")
|
||||
|
||||
# Should end with .avro extension
|
||||
assert result.endswith(".avro")
|
||||
|
||||
# Should contain the file name and hash suffix
|
||||
assert dir_name in result
|
||||
|
||||
# Should be deterministic - same inputs produce same output
|
||||
result2 = compose_postgres_dump_path(project_id, dir_name, code_version)
|
||||
assert result == result2
|
||||
|
||||
# Different code version should produce different path
|
||||
result_different_version = compose_postgres_dump_path(project_id, dir_name, "v2.0")
|
||||
assert result != result_different_version
|
||||
|
||||
|
||||
# Test schema for dump_model tests
|
||||
class DummySchema(AvroBase):
|
||||
name: str
|
||||
value: int
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def s3_resource():
|
||||
return S3Resource(
|
||||
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
|
||||
aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
|
||||
)
|
||||
|
||||
|
||||
def test_dump_model_creates_avro_file(s3_resource):
|
||||
"""Test that dump_model creates an Avro file with correct data in S3."""
|
||||
file_key = "test/path/data.avro"
|
||||
|
||||
test_models = [DummySchema(name="test1", value=1), DummySchema(name="test2", value=2)]
|
||||
|
||||
with dump_model(s3=s3_resource, schema=DummySchema, file_key=file_key) as dump:
|
||||
dump(test_models)
|
||||
|
||||
uploaded_file = s3_resource.get_client().get_object(Bucket=EVALS_S3_BUCKET, Key=file_key)["Body"]
|
||||
|
||||
# Verify the uploaded file contains valid Avro data
|
||||
records = list(cast(list[dict[str, Any]], reader(uploaded_file)))
|
||||
assert len(records) == 2
|
||||
assert records[0]["name"] == "test1"
|
||||
assert records[0]["value"] == 1
|
||||
assert records[1]["name"] == "test2"
|
||||
assert records[1]["value"] == 2
|
||||
|
||||
|
||||
def test_check_dump_exists(s3_resource):
|
||||
"""Test that check_dump_exists correctly identifies existing and non-existing files."""
|
||||
existing_file_key = "test/path/existing_file.avro"
|
||||
non_existing_file_key = "test/path/non_existing_file.avro"
|
||||
|
||||
# First create a file
|
||||
test_models = [DummySchema(name="test", value=42)]
|
||||
with dump_model(s3=s3_resource, schema=DummySchema, file_key=existing_file_key) as dump:
|
||||
dump(test_models)
|
||||
|
||||
# Test that existing file is found
|
||||
assert check_dump_exists(s3_resource, existing_file_key) is True
|
||||
|
||||
# Test that non-existing file returns False
|
||||
assert check_dump_exists(s3_resource, non_existing_file_key) is False
|
||||
143
ee/hogai/eval/schema.py
Normal file
143
ee/hogai/eval/schema.py
Normal file
@@ -0,0 +1,143 @@
|
||||
import json
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Generator, Sequence
|
||||
from typing import Generic, Self, TypeVar
|
||||
|
||||
from django.db.models import Model
|
||||
from pydantic import BaseModel
|
||||
from pydantic_avro import AvroBase
|
||||
|
||||
from posthog.models import (
|
||||
DataWarehouseTable,
|
||||
GroupTypeMapping,
|
||||
PropertyDefinition,
|
||||
Team,
|
||||
)
|
||||
|
||||
T = TypeVar("T", bound=Model)
|
||||
|
||||
|
||||
class BaseSnapshot(AvroBase, ABC, Generic[T]):
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def serialize_for_project(cls, project_id: int) -> Generator[Self, None, None]:
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def deserialize_for_project(
|
||||
cls, project_id: int, models: Sequence[Self], *, team_id: int
|
||||
) -> Generator[T, None, None]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
# posthog/models/team.py
|
||||
class TeamSnapshot(BaseSnapshot[Team]):
|
||||
name: str
|
||||
test_account_filters: str
|
||||
|
||||
@classmethod
|
||||
def serialize_for_project(cls, project_id: int):
|
||||
team = Team.objects.get(pk=project_id)
|
||||
yield TeamSnapshot(name=team.name, test_account_filters=json.dumps(team.test_account_filters))
|
||||
|
||||
@classmethod
|
||||
def deserialize_for_project(cls, project_id: int, models: Sequence[Self], **kwargs) -> Generator[Team, None, None]:
|
||||
for model in models:
|
||||
yield Team(id=project_id, name=model.name, test_account_filters=json.loads(model.test_account_filters))
|
||||
|
||||
|
||||
# posthog/models/property_definition.py
|
||||
class PropertyDefinitionSnapshot(BaseSnapshot[PropertyDefinition]):
|
||||
name: str
|
||||
is_numerical: bool
|
||||
property_type: str | None
|
||||
type: int
|
||||
group_type_index: int | None
|
||||
|
||||
@classmethod
|
||||
def serialize_for_project(cls, project_id: int):
|
||||
for prop in PropertyDefinition.objects.filter(project_id=project_id).iterator(500):
|
||||
yield PropertyDefinitionSnapshot(
|
||||
name=prop.name,
|
||||
is_numerical=prop.is_numerical,
|
||||
property_type=prop.property_type,
|
||||
type=prop.type,
|
||||
group_type_index=prop.group_type_index,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deserialize_for_project(cls, project_id: int, models: Sequence[Self], **kwargs):
|
||||
for model in models:
|
||||
yield PropertyDefinition(
|
||||
name=model.name,
|
||||
is_numerical=model.is_numerical,
|
||||
property_type=model.property_type,
|
||||
type=model.type,
|
||||
group_type_index=model.group_type_index,
|
||||
team_id=project_id,
|
||||
)
|
||||
|
||||
|
||||
# posthog/models/group_type_mapping.py
|
||||
class GroupTypeMappingSnapshot(BaseSnapshot[GroupTypeMapping]):
|
||||
group_type: str
|
||||
group_type_index: int
|
||||
name_singular: str | None
|
||||
name_plural: str | None
|
||||
|
||||
@classmethod
|
||||
def serialize_for_project(cls, project_id: int):
|
||||
for mapping in GroupTypeMapping.objects.filter(project_id=project_id).iterator(500):
|
||||
yield GroupTypeMappingSnapshot(
|
||||
group_type=mapping.group_type,
|
||||
group_type_index=mapping.group_type_index,
|
||||
name_singular=mapping.name_singular,
|
||||
name_plural=mapping.name_plural,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deserialize_for_project(cls, project_id: int, models: Sequence[Self], *, team_id: int):
|
||||
for model in models:
|
||||
yield GroupTypeMapping(
|
||||
group_type=model.group_type,
|
||||
group_type_index=model.group_type_index,
|
||||
name_singular=model.name_singular,
|
||||
name_plural=model.name_plural,
|
||||
team_id=team_id,
|
||||
project_id=project_id,
|
||||
)
|
||||
|
||||
|
||||
# posthog/models/warehouse/table.py
|
||||
class DataWarehouseTableSnapshot(BaseSnapshot[DataWarehouseTable]):
|
||||
name: str
|
||||
format: str
|
||||
columns: list[str]
|
||||
|
||||
@classmethod
|
||||
def serialize_for_project(cls, project_id: int):
|
||||
for table in DataWarehouseTable.objects.filter(team_id=project_id).iterator(500):
|
||||
yield DataWarehouseTableSnapshot(
|
||||
name=table.name,
|
||||
format=table.format,
|
||||
columns=table.columns,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deserialize_for_project(cls, project_id: int, models: Sequence[Self], **kwargs):
|
||||
for model in models:
|
||||
yield DataWarehouseTable(
|
||||
name=model.name,
|
||||
format=model.format,
|
||||
columns=model.columns,
|
||||
url_pattern="http://localhost", # Hardcoded. It's not important for evaluations what the value is.
|
||||
team_id=project_id,
|
||||
)
|
||||
|
||||
|
||||
class PostgresProjectDataSnapshot(BaseModel):
|
||||
project: str
|
||||
property_definitions: str
|
||||
group_type_mappings: str
|
||||
data_warehouse_tables: str
|
||||
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
|
||||
|
||||
DAGSTER_S3_BUCKET: str = os.getenv("DAGSTER_S3_BUCKET", "posthog-dags")
|
||||
DAGSTER_DEFAULT_SLACK_ALERTS_CHANNEL: str = os.getenv("DAGSTER_DEFAULT_SLACK_ALERTS_CHANNEL", "#alerts-clickhouse")
|
||||
DAGSTER_DATA_EXPORT_S3_BUCKET: str = os.getenv("DAGSTER_DATA_EXPORT_S3_BUCKET", "dagster-data-export")
|
||||
@@ -15,3 +14,5 @@ DAGSTER_DOMAIN: str | None = os.getenv("DAGSTER_DOMAIN")
|
||||
|
||||
DAGSTER_UI_HOST: str = os.getenv("DAGSTER_UI_HOST", "localhost")
|
||||
DAGSTER_UI_PORT: int = int(os.getenv("DAGSTER_UI_PORT", 3030))
|
||||
|
||||
DAGSTER_AI_EVALS_S3_BUCKET: str = os.getenv("DAGSTER_AI_EVALS_S3_BUCKET", "ai-evals")
|
||||
|
||||
@@ -157,6 +157,8 @@ dependencies = [
|
||||
"user-agents>=2.2.0",
|
||||
"django-mcp-server==0.5.6",
|
||||
"django-admin-inline-paginator===0.4.0",
|
||||
"fastavro>=1.12.0",
|
||||
"pydantic-avro>=0.9.0",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
|
||||
32
uv.lock
generated
32
uv.lock
generated
@@ -1,5 +1,5 @@
|
||||
version = 1
|
||||
revision = 2
|
||||
revision = 3
|
||||
requires-python = "==3.11.*"
|
||||
|
||||
[[package]]
|
||||
@@ -1824,6 +1824,20 @@ lua = [
|
||||
{ name = "lupa" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fastavro"
|
||||
version = "1.12.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/cc/ec/762dcf213e5b97ea1733b27d5a2798599a1fa51565b70a93690246029f84/fastavro-1.12.0.tar.gz", hash = "sha256:a67a87be149825d74006b57e52be068dfa24f3bfc6382543ec92cd72327fe152", size = 1025604, upload-time = "2025-07-31T15:16:42.933Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/6f/51/6bd93f2c9f3bb98f84ee0ddb436eb46a308ec53e884d606b70ca9d6b132d/fastavro-1.12.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:56f78d1d527bea4833945c3a8c716969ebd133c5762e2e34f64c795bd5a10b3e", size = 962215, upload-time = "2025-07-31T15:16:58.173Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/32/37/3e2e429cefe03d1fa98cc4c4edae1d133dc895db64dabe84c17b4dc0921c/fastavro-1.12.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a7ce0d117642bb4265ef6e1619ec2d93e942a98f60636e3c0fbf1eb438c49026", size = 3412716, upload-time = "2025-07-31T15:17:00.301Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/33/28/eb37d9738ea3649bdcab1b6d4fd0facf9c36261623ea368554734d5d6821/fastavro-1.12.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:52e9d9648aad4cca5751bcbe2d3f98e85afb0ec6c6565707f4e2f647ba83ba85", size = 3439283, upload-time = "2025-07-31T15:17:02.505Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/57/6f/7aba4efbf73fd80ca20aa1db560936c222dd1b4e5cadbf9304361b9065e3/fastavro-1.12.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:6183875381ec1cf85a1891bf46696fd1ec2ad732980e7bccc1e52e9904e7664d", size = 3354728, upload-time = "2025-07-31T15:17:04.705Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/bf/2d/b0d8539f4622ebf5355b7898ac7930b1ff638de85b6c3acdd0718e05d09e/fastavro-1.12.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:5ad00a2b94d3c8bf9239acf92d56e3e457e1d188687a8d80f31e858ccf91a6d6", size = 3442598, upload-time = "2025-07-31T15:17:06.986Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/fe/33/882154b17e0fd468f1a5ae8cc903805531e1fcb699140315366c5f8ec20d/fastavro-1.12.0-cp311-cp311-win_amd64.whl", hash = "sha256:6c4d1c276ff1410f3830648bb43312894ad65709ca0cb54361e28954387a46ac", size = 451836, upload-time = "2025-07-31T15:17:08.219Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "filelock"
|
||||
version = "3.18.0"
|
||||
@@ -3931,6 +3945,7 @@ dependencies = [
|
||||
{ name = "drf-spectacular" },
|
||||
{ name = "elevenlabs" },
|
||||
{ name = "emoji" },
|
||||
{ name = "fastavro" },
|
||||
{ name = "geoip2" },
|
||||
{ name = "google-ads" },
|
||||
{ name = "google-cloud-bigquery" },
|
||||
@@ -3979,6 +3994,7 @@ dependencies = [
|
||||
{ name = "psycopg2-binary" },
|
||||
{ name = "pyarrow" },
|
||||
{ name = "pydantic" },
|
||||
{ name = "pydantic-avro" },
|
||||
{ name = "pyjwt" },
|
||||
{ name = "pymongo" },
|
||||
{ name = "pympler" },
|
||||
@@ -4152,6 +4168,7 @@ requires-dist = [
|
||||
{ name = "drf-spectacular", specifier = "==0.27.2" },
|
||||
{ name = "elevenlabs", specifier = ">=1.58.1" },
|
||||
{ name = "emoji", specifier = "==2.14.1" },
|
||||
{ name = "fastavro", specifier = ">=1.12.0" },
|
||||
{ name = "geoip2", specifier = "==4.6.0" },
|
||||
{ name = "google-ads", specifier = "==26.1.0" },
|
||||
{ name = "google-cloud-bigquery", specifier = "==3.26" },
|
||||
@@ -4200,6 +4217,7 @@ requires-dist = [
|
||||
{ name = "psycopg2-binary", specifier = "==2.9.7" },
|
||||
{ name = "pyarrow", specifier = "==18.1.0" },
|
||||
{ name = "pydantic", specifier = "==2.10.3" },
|
||||
{ name = "pydantic-avro", specifier = ">=0.9.0" },
|
||||
{ name = "pyjwt", specifier = "==2.4.0" },
|
||||
{ name = "pymongo", specifier = "==4.13.2" },
|
||||
{ name = "pympler", specifier = "==1.1" },
|
||||
@@ -4599,6 +4617,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/62/51/72c18c55cf2f46ff4f91ebcc8f75aa30f7305f3d726be3f4ebffb4ae972b/pydantic-2.10.3-py3-none-any.whl", hash = "sha256:be04d85bbc7b65651c5f8e6b9976ed9c6f41782a55524cef079a34a0bb82144d", size = 456997, upload-time = "2024-12-03T15:58:59.867Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pydantic-avro"
|
||||
version = "0.9.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "pydantic" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/bb/39/dd62ce59a02b7e72ec7bfc368d02b08145a782577a9b187110053cd25c47/pydantic_avro-0.9.0.tar.gz", hash = "sha256:5ceb1d8fca8d190db4a090217412cd176d7042f7d7e5f87306703ef70c8db933", size = 9479, upload-time = "2025-04-09T20:34:58.578Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/4f/96/89e283f69228e8a64e09dc4f6083cbc96f64ef52f61615a87ea6ade3c7cd/pydantic_avro-0.9.0-py3-none-any.whl", hash = "sha256:d2de621922a4138f8af92b93eae4dbc6b708c09940e223c29be18be819233b5e", size = 11040, upload-time = "2025-04-09T20:34:57.257Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pydantic-core"
|
||||
version = "2.27.1"
|
||||
|
||||
Reference in New Issue
Block a user