diff --git a/.dagster_home/workspace.yaml b/.dagster_home/workspace.yaml index e2037abd07..751a19cacd 100644 --- a/.dagster_home/workspace.yaml +++ b/.dagster_home/workspace.yaml @@ -5,6 +5,7 @@ load_from: ## Please be considerate, anything here will run on every developer's machine. ## Something like a full backup or something that makes heavy network requests should not be included. ## To test something like that locally, uncomment but do not commit. + # - python_module: dags.locations.data_warehouse # - python_module: dags.locations.error_tracking # - python_module: dags.locations.growth # - python_module: dags.locations.revenue_analytics diff --git a/dags/common.py b/dags/common.py index 34e35d4a85..425e786f54 100644 --- a/dags/common.py +++ b/dags/common.py @@ -19,6 +19,7 @@ class JobOwners(str, Enum): TEAM_GROWTH = "team-growth" TEAM_EXPERIMENTS = "team-experiments" TEAM_MAX_AI = "team-max-ai" + TEAM_DATA_WAREHOUSE = "team-data-warehouse" class ClickhouseClusterResource(dagster.ConfigurableResource): diff --git a/dags/locations/data_warehouse.py b/dags/locations/data_warehouse.py new file mode 100644 index 0000000000..49c4889511 --- /dev/null +++ b/dags/locations/data_warehouse.py @@ -0,0 +1,12 @@ +import dagster + +from dags import managed_viewset_sync + +from . import resources + +defs = dagster.Definitions( + jobs=[ + managed_viewset_sync.sync_managed_viewsets_job, + ], + resources=resources, +) diff --git a/dags/managed_viewset_sync.py b/dags/managed_viewset_sync.py new file mode 100644 index 0000000000..7a1df35f7d --- /dev/null +++ b/dags/managed_viewset_sync.py @@ -0,0 +1,93 @@ +import dagster +import structlog + +from posthog.warehouse.models.datawarehouse_managed_viewset import DataWarehouseManagedViewSet + +from dags.common import JobOwners + +logger = structlog.get_logger(__name__) + + +@dagster.op( + config_schema={ + "kind": dagster.Field( + dagster.String, + default_value="", + is_required=False, + description="Specific kind to sync. If not provided, syncs all kinds.", + ) + } +) +def sync_managed_viewsets_op( + context: dagster.OpExecutionContext, +) -> None: + """ + Sync views for all ManagedViewsets of a specific kind, or all kinds if no kind specified. + """ + # Build queryset + queryset = DataWarehouseManagedViewSet.objects.all() + kind = context.op_config.get("kind", "") + if kind is not None and kind != "": + if kind not in DataWarehouseManagedViewSet.Kind.values: + raise ValueError(f"Invalid kind: {kind}") + queryset = queryset.filter(kind=kind) + context.log.info(f"Syncing ManagedViewsets for kind: {kind}") + else: + context.log.info("Syncing all ManagedViewsets") + + # Get all viewsets + count = queryset.count() + context.log.info(f"Found {count} ManagedViewsets to sync") + + synced_count = 0 + failed_count = 0 + failed_viewsets = [] + + for viewset in queryset.iterator(): + try: + context.log.info(f"Syncing viewset {viewset.id} (kind: {viewset.kind}, team: {viewset.team_id})") + viewset.sync_views() + synced_count += 1 + context.log.info(f"Successfully synced viewset {viewset.id}") + except Exception as e: + failed_count += 1 + failed_viewsets.append( + {"viewset_id": str(viewset.id), "kind": viewset.kind, "team_id": viewset.team_id, "error": str(e)} + ) + logger.error( + "failed_to_sync_managed_viewset", + viewset_id=str(viewset.id), + kind=viewset.kind, + team_id=viewset.team_id, + error=str(e), + exc_info=True, + ) + + # Add output metadata + context.add_output_metadata( + { + "total_viewsets": dagster.MetadataValue.int(count), + "synced_count": dagster.MetadataValue.int(synced_count), + "failed_count": dagster.MetadataValue.int(failed_count), + "kind_filter": dagster.MetadataValue.text(kind or "all"), + "failed_viewsets": dagster.MetadataValue.json(failed_viewsets) + if failed_viewsets + else dagster.MetadataValue.text("None"), + } + ) + + if failed_count > 0: + raise dagster.Failure(f"Failed to sync {failed_count} out of {count} viewsets") + + +@dagster.job( + name="sync_managed_viewsets", + tags={"owner": JobOwners.TEAM_DATA_WAREHOUSE.value}, +) +def sync_managed_viewsets_job(): + """ + Job that syncs views for ManagedViewsets. + + Can be configured to sync all kinds or a specific kind. + """ + sync_managed_viewsets_op() diff --git a/dags/slack_alerts.py b/dags/slack_alerts.py index 95d49c8120..1fc0a35e75 100644 --- a/dags/slack_alerts.py +++ b/dags/slack_alerts.py @@ -16,6 +16,7 @@ notification_channel_per_team = { JobOwners.TEAM_GROWTH.value: "#alerts-growth", JobOwners.TEAM_EXPERIMENTS.value: "#alerts-experiments", JobOwners.TEAM_MAX_AI.value: "#alerts-max-ai", + JobOwners.TEAM_DATA_WAREHOUSE.value: "#alerts-data-warehouse", } CONSECUTIVE_FAILURE_THRESHOLDS = { diff --git a/dags/tests/test_managed_viewset_sync.py b/dags/tests/test_managed_viewset_sync.py new file mode 100644 index 0000000000..76e528d68d --- /dev/null +++ b/dags/tests/test_managed_viewset_sync.py @@ -0,0 +1,128 @@ +import pytest +from unittest.mock import patch + +from dagster import build_op_context + +from posthog.models import Organization, Team +from posthog.warehouse.models.datawarehouse_managed_viewset import DataWarehouseManagedViewSet + +from dags.managed_viewset_sync import sync_managed_viewsets_job, sync_managed_viewsets_op + + +class TestSyncManagedViewsetsOp: + @pytest.mark.django_db + def test_sync_all_viewsets_success(self): + # Setup - create real database objects + org = Organization.objects.create(name="Test Org") + team1 = Team.objects.create(organization=org, name="Test Team 1") + team2 = Team.objects.create(organization=org, name="Test Team 2") + + # Create real ManagedViewsets + DataWarehouseManagedViewSet.objects.create(team=team1, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS) + DataWarehouseManagedViewSet.objects.create(team=team2, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS) + + # Mock the sync_views method to avoid actual database operations + with patch.object(DataWarehouseManagedViewSet, "sync_views") as mock_sync: + # Create proper Dagster context + context = build_op_context(op_config={"kind": ""}) + + # Execute + sync_managed_viewsets_op(context) + + # Verify + assert mock_sync.call_count == 2 + + # Check metadata + metadata = context.get_output_metadata("result") + assert metadata["total_viewsets"].value == 2 # type: ignore + assert metadata["synced_count"].value == 2 # type: ignore + assert metadata["failed_count"].value == 0 # type: ignore + + @pytest.mark.django_db + def test_sync_filtered_by_kind(self): + # Setup - create real database objects + org = Organization.objects.create(name="Test Org") + team1 = Team.objects.create(organization=org, name="Test Team 1") + team2 = Team.objects.create(organization=org, name="Test Team 2") + + # Create real ManagedViewsets + DataWarehouseManagedViewSet.objects.create(team=team1, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS) + DataWarehouseManagedViewSet.objects.create(team=team2, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS) + + # Mock the sync_views method + with patch.object(DataWarehouseManagedViewSet, "sync_views") as mock_sync: + # Create proper Dagster context + context = build_op_context(op_config={"kind": "revenue_analytics"}) + + # Execute + sync_managed_viewsets_op(context) + + # Verify + assert mock_sync.call_count == 2 + + # Check metadata + metadata = context.get_output_metadata("result") + assert metadata["total_viewsets"].value == 2 # type: ignore + assert metadata["synced_count"].value == 2 # type: ignore + assert metadata["failed_count"].value == 0 # type: ignore + + @pytest.mark.django_db + def test_sync_with_failures(self): + # Setup - create real database objects + org = Organization.objects.create(name="Test Org") + team1 = Team.objects.create(organization=org, name="Test Team 1") + team2 = Team.objects.create(organization=org, name="Test Team 2") + + # Create real ManagedViewsets + DataWarehouseManagedViewSet.objects.create(team=team1, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS) + DataWarehouseManagedViewSet.objects.create(team=team2, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS) + + # Mock sync_views to fail for one viewset + called = False + + def mock_sync_views(): + nonlocal called + # Fail for the second viewset (team2) + if called: + raise Exception("Sync failed") + called = True + return None + + with patch.object(DataWarehouseManagedViewSet, "sync_views", side_effect=mock_sync_views): + # Create proper Dagster context + context = build_op_context(op_config={"kind": ""}) + + # Execute and expect failure + with pytest.raises(Exception, match="Failed to sync 1 out of 2 viewsets"): + sync_managed_viewsets_op(context) + + @pytest.mark.django_db + def test_sync_with_invalid_kind(self): + # Create proper Dagster context + context = build_op_context(op_config={"kind": "invalid_kind"}) + + # Execute and verify exception + with pytest.raises(ValueError, match="Invalid kind: invalid_kind"): + sync_managed_viewsets_op(context) + + +class TestSyncManagedViewsetsJob: + @pytest.mark.django_db + def test_job_execution_success(self): + # Setup - create real database objects + org = Organization.objects.create(name="Test Org") + team = Team.objects.create(organization=org, name="Test Team") + + # Create real ManagedViewset + DataWarehouseManagedViewSet.objects.create(team=team, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS) + + # Mock the sync_views method + with patch.object(DataWarehouseManagedViewSet, "sync_views") as mock_sync: + # Execute job + result = sync_managed_viewsets_job.execute_in_process( + run_config={"ops": {"sync_managed_viewsets_op": {"config": {"kind": "revenue_analytics"}}}} + ) + + # Verify + assert result.success + mock_sync.assert_called_once()