feat: Add Dagster job to easily resync managed viewsets (#40175)

This commit is contained in:
Rafael Audibert
2025-10-24 18:14:32 -03:00
committed by GitHub
parent 99671896c3
commit 44ef924616
6 changed files with 236 additions and 0 deletions

View File

@@ -5,6 +5,7 @@ load_from:
## Please be considerate, anything here will run on every developer's machine.
## Something like a full backup or something that makes heavy network requests should not be included.
## To test something like that locally, uncomment but do not commit.
# - python_module: dags.locations.data_warehouse
# - python_module: dags.locations.error_tracking
# - python_module: dags.locations.growth
# - python_module: dags.locations.revenue_analytics

View File

@@ -19,6 +19,7 @@ class JobOwners(str, Enum):
TEAM_GROWTH = "team-growth"
TEAM_EXPERIMENTS = "team-experiments"
TEAM_MAX_AI = "team-max-ai"
TEAM_DATA_WAREHOUSE = "team-data-warehouse"
class ClickhouseClusterResource(dagster.ConfigurableResource):

View File

@@ -0,0 +1,12 @@
import dagster
from dags import managed_viewset_sync
from . import resources
defs = dagster.Definitions(
jobs=[
managed_viewset_sync.sync_managed_viewsets_job,
],
resources=resources,
)

View File

@@ -0,0 +1,93 @@
import dagster
import structlog
from posthog.warehouse.models.datawarehouse_managed_viewset import DataWarehouseManagedViewSet
from dags.common import JobOwners
logger = structlog.get_logger(__name__)
@dagster.op(
config_schema={
"kind": dagster.Field(
dagster.String,
default_value="",
is_required=False,
description="Specific kind to sync. If not provided, syncs all kinds.",
)
}
)
def sync_managed_viewsets_op(
context: dagster.OpExecutionContext,
) -> None:
"""
Sync views for all ManagedViewsets of a specific kind, or all kinds if no kind specified.
"""
# Build queryset
queryset = DataWarehouseManagedViewSet.objects.all()
kind = context.op_config.get("kind", "")
if kind is not None and kind != "":
if kind not in DataWarehouseManagedViewSet.Kind.values:
raise ValueError(f"Invalid kind: {kind}")
queryset = queryset.filter(kind=kind)
context.log.info(f"Syncing ManagedViewsets for kind: {kind}")
else:
context.log.info("Syncing all ManagedViewsets")
# Get all viewsets
count = queryset.count()
context.log.info(f"Found {count} ManagedViewsets to sync")
synced_count = 0
failed_count = 0
failed_viewsets = []
for viewset in queryset.iterator():
try:
context.log.info(f"Syncing viewset {viewset.id} (kind: {viewset.kind}, team: {viewset.team_id})")
viewset.sync_views()
synced_count += 1
context.log.info(f"Successfully synced viewset {viewset.id}")
except Exception as e:
failed_count += 1
failed_viewsets.append(
{"viewset_id": str(viewset.id), "kind": viewset.kind, "team_id": viewset.team_id, "error": str(e)}
)
logger.error(
"failed_to_sync_managed_viewset",
viewset_id=str(viewset.id),
kind=viewset.kind,
team_id=viewset.team_id,
error=str(e),
exc_info=True,
)
# Add output metadata
context.add_output_metadata(
{
"total_viewsets": dagster.MetadataValue.int(count),
"synced_count": dagster.MetadataValue.int(synced_count),
"failed_count": dagster.MetadataValue.int(failed_count),
"kind_filter": dagster.MetadataValue.text(kind or "all"),
"failed_viewsets": dagster.MetadataValue.json(failed_viewsets)
if failed_viewsets
else dagster.MetadataValue.text("None"),
}
)
if failed_count > 0:
raise dagster.Failure(f"Failed to sync {failed_count} out of {count} viewsets")
@dagster.job(
name="sync_managed_viewsets",
tags={"owner": JobOwners.TEAM_DATA_WAREHOUSE.value},
)
def sync_managed_viewsets_job():
"""
Job that syncs views for ManagedViewsets.
Can be configured to sync all kinds or a specific kind.
"""
sync_managed_viewsets_op()

View File

@@ -16,6 +16,7 @@ notification_channel_per_team = {
JobOwners.TEAM_GROWTH.value: "#alerts-growth",
JobOwners.TEAM_EXPERIMENTS.value: "#alerts-experiments",
JobOwners.TEAM_MAX_AI.value: "#alerts-max-ai",
JobOwners.TEAM_DATA_WAREHOUSE.value: "#alerts-data-warehouse",
}
CONSECUTIVE_FAILURE_THRESHOLDS = {

View File

@@ -0,0 +1,128 @@
import pytest
from unittest.mock import patch
from dagster import build_op_context
from posthog.models import Organization, Team
from posthog.warehouse.models.datawarehouse_managed_viewset import DataWarehouseManagedViewSet
from dags.managed_viewset_sync import sync_managed_viewsets_job, sync_managed_viewsets_op
class TestSyncManagedViewsetsOp:
@pytest.mark.django_db
def test_sync_all_viewsets_success(self):
# Setup - create real database objects
org = Organization.objects.create(name="Test Org")
team1 = Team.objects.create(organization=org, name="Test Team 1")
team2 = Team.objects.create(organization=org, name="Test Team 2")
# Create real ManagedViewsets
DataWarehouseManagedViewSet.objects.create(team=team1, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS)
DataWarehouseManagedViewSet.objects.create(team=team2, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS)
# Mock the sync_views method to avoid actual database operations
with patch.object(DataWarehouseManagedViewSet, "sync_views") as mock_sync:
# Create proper Dagster context
context = build_op_context(op_config={"kind": ""})
# Execute
sync_managed_viewsets_op(context)
# Verify
assert mock_sync.call_count == 2
# Check metadata
metadata = context.get_output_metadata("result")
assert metadata["total_viewsets"].value == 2 # type: ignore
assert metadata["synced_count"].value == 2 # type: ignore
assert metadata["failed_count"].value == 0 # type: ignore
@pytest.mark.django_db
def test_sync_filtered_by_kind(self):
# Setup - create real database objects
org = Organization.objects.create(name="Test Org")
team1 = Team.objects.create(organization=org, name="Test Team 1")
team2 = Team.objects.create(organization=org, name="Test Team 2")
# Create real ManagedViewsets
DataWarehouseManagedViewSet.objects.create(team=team1, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS)
DataWarehouseManagedViewSet.objects.create(team=team2, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS)
# Mock the sync_views method
with patch.object(DataWarehouseManagedViewSet, "sync_views") as mock_sync:
# Create proper Dagster context
context = build_op_context(op_config={"kind": "revenue_analytics"})
# Execute
sync_managed_viewsets_op(context)
# Verify
assert mock_sync.call_count == 2
# Check metadata
metadata = context.get_output_metadata("result")
assert metadata["total_viewsets"].value == 2 # type: ignore
assert metadata["synced_count"].value == 2 # type: ignore
assert metadata["failed_count"].value == 0 # type: ignore
@pytest.mark.django_db
def test_sync_with_failures(self):
# Setup - create real database objects
org = Organization.objects.create(name="Test Org")
team1 = Team.objects.create(organization=org, name="Test Team 1")
team2 = Team.objects.create(organization=org, name="Test Team 2")
# Create real ManagedViewsets
DataWarehouseManagedViewSet.objects.create(team=team1, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS)
DataWarehouseManagedViewSet.objects.create(team=team2, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS)
# Mock sync_views to fail for one viewset
called = False
def mock_sync_views():
nonlocal called
# Fail for the second viewset (team2)
if called:
raise Exception("Sync failed")
called = True
return None
with patch.object(DataWarehouseManagedViewSet, "sync_views", side_effect=mock_sync_views):
# Create proper Dagster context
context = build_op_context(op_config={"kind": ""})
# Execute and expect failure
with pytest.raises(Exception, match="Failed to sync 1 out of 2 viewsets"):
sync_managed_viewsets_op(context)
@pytest.mark.django_db
def test_sync_with_invalid_kind(self):
# Create proper Dagster context
context = build_op_context(op_config={"kind": "invalid_kind"})
# Execute and verify exception
with pytest.raises(ValueError, match="Invalid kind: invalid_kind"):
sync_managed_viewsets_op(context)
class TestSyncManagedViewsetsJob:
@pytest.mark.django_db
def test_job_execution_success(self):
# Setup - create real database objects
org = Organization.objects.create(name="Test Org")
team = Team.objects.create(organization=org, name="Test Team")
# Create real ManagedViewset
DataWarehouseManagedViewSet.objects.create(team=team, kind=DataWarehouseManagedViewSet.Kind.REVENUE_ANALYTICS)
# Mock the sync_views method
with patch.object(DataWarehouseManagedViewSet, "sync_views") as mock_sync:
# Execute job
result = sync_managed_viewsets_job.execute_in_process(
run_config={"ops": {"sync_managed_viewsets_op": {"config": {"kind": "revenue_analytics"}}}}
)
# Verify
assert result.success
mock_sync.assert_called_once()