mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
feat(experiments): Ability to recalculate timeseries (#39697)
This commit is contained in:
276
dags/experiment_timeseries_recalculation.py
Normal file
276
dags/experiment_timeseries_recalculation.py
Normal file
@@ -0,0 +1,276 @@
|
||||
"""
|
||||
Dagster asset and automation for experiment timeseries recalculation.
|
||||
|
||||
This module allows users to recalculate historical timeseries data for experiments
|
||||
when they need to regenerate data after changes or fixes.
|
||||
"""
|
||||
|
||||
import time as time_module
|
||||
from datetime import datetime, time, timedelta
|
||||
from typing import Any
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import dagster
|
||||
from dagster import AssetExecutionContext, RetryPolicy, RunRequest, SkipReason
|
||||
|
||||
from posthog.schema import ExperimentFunnelMetric, ExperimentMeanMetric, ExperimentQuery, ExperimentRatioMetric
|
||||
|
||||
from posthog.hogql_queries.experiments.experiment_query_runner import ExperimentQueryRunner
|
||||
from posthog.models.experiment import ExperimentMetricResult, ExperimentTimeseriesRecalculation
|
||||
|
||||
from dags.common import JobOwners
|
||||
from dags.experiments import remove_step_sessions_from_experiment_result
|
||||
|
||||
experiment_timeseries_recalculation_partitions_def = dagster.DynamicPartitionsDefinition(
|
||||
name="experiment_recalculations"
|
||||
)
|
||||
|
||||
|
||||
def parse_partition_key(partition_key: str) -> tuple[str, int, str, str]:
|
||||
"""
|
||||
Parse a recalculation partition key into its components.
|
||||
|
||||
Expected format: "recalculation_{recalc_id}_experiment_{experiment_id}_metric_{metric_uuid}_{fingerprint}"
|
||||
"""
|
||||
parts = partition_key.split("_")
|
||||
|
||||
if len(parts) < 6 or parts[0] != "recalculation" or parts[2] != "experiment" or parts[4] != "metric":
|
||||
raise ValueError(f"Invalid partition key format: {partition_key}")
|
||||
|
||||
try:
|
||||
recalculation_id = parts[1]
|
||||
experiment_id = int(parts[3])
|
||||
metric_uuid = parts[5]
|
||||
fingerprint = "_".join(parts[6:])
|
||||
return recalculation_id, experiment_id, metric_uuid, fingerprint
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Failed to parse partition key {partition_key}: {e}")
|
||||
|
||||
|
||||
def get_metric(metric_data):
|
||||
"""Build metric object from metric data."""
|
||||
metric_type = metric_data.get("metric_type")
|
||||
|
||||
if metric_type == "mean":
|
||||
return ExperimentMeanMetric(**metric_data)
|
||||
elif metric_type == "funnel":
|
||||
return ExperimentFunnelMetric(**metric_data)
|
||||
elif metric_type == "ratio":
|
||||
return ExperimentRatioMetric(**metric_data)
|
||||
else:
|
||||
raise dagster.Failure(f"Unknown metric type: {metric_type}")
|
||||
|
||||
|
||||
@dagster.asset(
|
||||
partitions_def=experiment_timeseries_recalculation_partitions_def,
|
||||
group_name="experiments",
|
||||
tags={"owner": JobOwners.TEAM_EXPERIMENTS.value},
|
||||
retry_policy=RetryPolicy(max_retries=2, delay=300),
|
||||
)
|
||||
def experiment_timeseries_recalculation(context: AssetExecutionContext) -> dict[str, Any]:
|
||||
"""
|
||||
Process entire experiment recalculation with resumability.
|
||||
|
||||
Each partition processes one recalculation request, calculating timeseries data
|
||||
for each day in the experiment's date range. Supports resuming from the
|
||||
last successful date if the job fails and is retried.
|
||||
"""
|
||||
recalculation_id, experiment_id_from_key, metric_uuid_from_key, fingerprint_from_key = parse_partition_key(
|
||||
context.partition_key
|
||||
)
|
||||
|
||||
context.log.info(
|
||||
f"Starting recalculation {recalculation_id} for experiment {experiment_id_from_key}, "
|
||||
f"metric {metric_uuid_from_key}, fingerprint {fingerprint_from_key}"
|
||||
)
|
||||
|
||||
try:
|
||||
recalculation_request = ExperimentTimeseriesRecalculation.objects.get(id=recalculation_id)
|
||||
except ExperimentTimeseriesRecalculation.DoesNotExist:
|
||||
raise dagster.Failure(f"Recalculation request {recalculation_id} not found")
|
||||
|
||||
if recalculation_request.status == ExperimentTimeseriesRecalculation.Status.PENDING:
|
||||
recalculation_request.status = ExperimentTimeseriesRecalculation.Status.IN_PROGRESS
|
||||
recalculation_request.save(update_fields=["status"])
|
||||
|
||||
experiment = recalculation_request.experiment
|
||||
if not experiment.start_date:
|
||||
raise dagster.Failure(f"Experiment {experiment.id} has no start_date")
|
||||
|
||||
# Convert experiment dates from UTC to team timezone for correct daily boundaries
|
||||
team_tz = ZoneInfo(experiment.team.timezone) if experiment.team.timezone else ZoneInfo("UTC")
|
||||
start_date = experiment.start_date.astimezone(team_tz).date()
|
||||
|
||||
if experiment.end_date:
|
||||
end_date = experiment.end_date.astimezone(team_tz).date()
|
||||
else:
|
||||
end_date = datetime.now(team_tz).date()
|
||||
|
||||
# Resume from last successful date or start fresh
|
||||
if recalculation_request.last_successful_date:
|
||||
current_date = recalculation_request.last_successful_date + timedelta(days=1)
|
||||
context.log.info(f"Resuming recalculation from {current_date}")
|
||||
else:
|
||||
current_date = start_date
|
||||
context.log.info(f"Starting fresh recalculation from {current_date}")
|
||||
|
||||
metric_obj = get_metric(recalculation_request.metric)
|
||||
experiment_query = ExperimentQuery(experiment_id=experiment.id, metric=metric_obj)
|
||||
fingerprint = recalculation_request.fingerprint
|
||||
|
||||
days_processed = 0
|
||||
date_range_delta = end_date - start_date
|
||||
total_days = date_range_delta.days + 1
|
||||
|
||||
while current_date <= end_date:
|
||||
try:
|
||||
day_num = (current_date - start_date).days + 1
|
||||
context.log.info(f"Processing day {day_num}/{total_days}: {current_date}")
|
||||
|
||||
# Create end-of-day timestamp in team timezone, then convert to UTC for ExperimentQueryRunner
|
||||
end_of_day_team_tz = datetime.combine(current_date + timedelta(days=1), time(0, 0, 0)).replace(
|
||||
tzinfo=team_tz
|
||||
)
|
||||
query_to_utc = end_of_day_team_tz.astimezone(ZoneInfo("UTC"))
|
||||
|
||||
query_runner = ExperimentQueryRunner(
|
||||
query=experiment_query, team=experiment.team, override_end_date=query_to_utc
|
||||
)
|
||||
result = query_runner._calculate()
|
||||
result = remove_step_sessions_from_experiment_result(result)
|
||||
|
||||
ExperimentMetricResult.objects.update_or_create(
|
||||
experiment_id=experiment.id,
|
||||
metric_uuid=recalculation_request.metric.get("uuid"),
|
||||
fingerprint=fingerprint,
|
||||
query_to=query_to_utc,
|
||||
defaults={
|
||||
"query_from": experiment.start_date,
|
||||
"status": ExperimentMetricResult.Status.COMPLETED,
|
||||
"result": result.model_dump(),
|
||||
"query_id": None,
|
||||
"completed_at": datetime.now(ZoneInfo("UTC")),
|
||||
"error_message": None,
|
||||
},
|
||||
)
|
||||
|
||||
recalculation_request.last_successful_date = current_date
|
||||
recalculation_request.save(update_fields=["last_successful_date"])
|
||||
days_processed += 1
|
||||
|
||||
progress_pct = round((day_num / total_days) * 100, 1)
|
||||
context.log.info(f"Progress: {progress_pct}% ({day_num}/{total_days} days)")
|
||||
|
||||
except Exception as e:
|
||||
context.log.exception(f"Failed on {current_date}: {e}")
|
||||
recalculation_request.status = ExperimentTimeseriesRecalculation.Status.FAILED
|
||||
recalculation_request.save(update_fields=["status"])
|
||||
|
||||
raise dagster.Failure(
|
||||
f"Recalculation failed on {current_date}",
|
||||
metadata={
|
||||
"failed_date": current_date.isoformat(),
|
||||
"days_completed": days_processed,
|
||||
"total_days": total_days,
|
||||
"error": str(e),
|
||||
},
|
||||
)
|
||||
|
||||
current_date += timedelta(days=1)
|
||||
|
||||
recalculation_request.status = ExperimentTimeseriesRecalculation.Status.COMPLETED
|
||||
recalculation_request.save(update_fields=["status"])
|
||||
context.log.info(f"Recalculation {recalculation_id} completed successfully")
|
||||
|
||||
context.add_output_metadata(
|
||||
{
|
||||
"recalculation_id": recalculation_id,
|
||||
"experiment_id": experiment.id,
|
||||
"metric_uuid": recalculation_request.metric.get("uuid"),
|
||||
"days_processed": days_processed,
|
||||
"total_days": total_days,
|
||||
"progress_percentage": 100.0,
|
||||
"date_range": f"{start_date} to {end_date}",
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"recalculation_id": str(recalculation_id),
|
||||
"experiment_id": experiment.id,
|
||||
"metric_uuid": recalculation_request.metric.get("uuid"),
|
||||
"days_processed": days_processed,
|
||||
"start_date": start_date.isoformat(),
|
||||
"end_date": end_date.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
experiment_timeseries_recalculation_job = dagster.define_asset_job(
|
||||
name="experiment_timeseries_recalculation_job",
|
||||
selection=[experiment_timeseries_recalculation],
|
||||
tags={"owner": JobOwners.TEAM_EXPERIMENTS.value},
|
||||
)
|
||||
|
||||
|
||||
@dagster.sensor(
|
||||
job=experiment_timeseries_recalculation_job,
|
||||
minimum_interval_seconds=30,
|
||||
tags={"owner": JobOwners.TEAM_EXPERIMENTS.value},
|
||||
)
|
||||
def experiment_timeseries_recalculation_sensor(context: dagster.SensorEvaluationContext):
|
||||
"""
|
||||
Discover pending recalculation requests and create partitions to process them.
|
||||
|
||||
This sensor runs every 30 seconds, finds new PENDING recalculation requests,
|
||||
creates dynamic partitions for them, and triggers their execution.
|
||||
"""
|
||||
# Track which recalculation requests we've already processed
|
||||
last_processed_id = context.cursor
|
||||
|
||||
filter_kwargs: dict[str, Any] = {"status": ExperimentTimeseriesRecalculation.Status.PENDING}
|
||||
if last_processed_id:
|
||||
filter_kwargs["id__gt"] = last_processed_id
|
||||
|
||||
new_recalculations = list(
|
||||
ExperimentTimeseriesRecalculation.objects.filter(**filter_kwargs).order_by("id")[:100]
|
||||
) # Limit to 100 requests at a time
|
||||
|
||||
if not new_recalculations:
|
||||
return SkipReason("No new recalculation requests")
|
||||
|
||||
context.log.info(f"Found {len(new_recalculations)} new recalculation requests")
|
||||
|
||||
partition_keys = []
|
||||
latest_id = last_processed_id or ""
|
||||
|
||||
for recalc_request in new_recalculations:
|
||||
metric_uuid = recalc_request.metric.get("uuid", "unknown")
|
||||
partition_key = (
|
||||
f"recalculation_{recalc_request.id}_"
|
||||
f"experiment_{recalc_request.experiment_id}_"
|
||||
f"metric_{metric_uuid}_{recalc_request.fingerprint}"
|
||||
)
|
||||
partition_keys.append(partition_key)
|
||||
latest_id = max(latest_id, str(recalc_request.id))
|
||||
context.log.info(f"Creating partition {partition_key}")
|
||||
|
||||
context.instance.add_dynamic_partitions("experiment_recalculations", partition_keys)
|
||||
context.update_cursor(latest_id)
|
||||
|
||||
run_requests = []
|
||||
for partition_key in partition_keys:
|
||||
recalculation_id, experiment_id, metric_uuid, fingerprint = parse_partition_key(partition_key)
|
||||
run_requests.append(
|
||||
RunRequest(
|
||||
run_key=f"recalculation_{recalculation_id}_{int(time_module.time())}",
|
||||
partition_key=partition_key,
|
||||
tags={
|
||||
"recalculation_id": str(recalculation_id),
|
||||
"experiment_id": str(experiment_id),
|
||||
"metric_uuid": metric_uuid,
|
||||
"fingerprint": fingerprint,
|
||||
"triggered_at": datetime.now(ZoneInfo("UTC")).isoformat(),
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
return run_requests
|
||||
@@ -6,7 +6,11 @@ with shared resources to create a complete Dagster definitions object.
|
||||
|
||||
import dagster
|
||||
|
||||
from dags import experiment_regular_metrics_timeseries, experiment_saved_metrics_timeseries
|
||||
from dags import (
|
||||
experiment_regular_metrics_timeseries,
|
||||
experiment_saved_metrics_timeseries,
|
||||
experiment_timeseries_recalculation,
|
||||
)
|
||||
|
||||
from . import resources
|
||||
|
||||
@@ -19,10 +23,12 @@ def _create_definitions():
|
||||
jobs = [
|
||||
experiment_regular_metrics_timeseries.experiment_regular_metrics_timeseries_job,
|
||||
experiment_saved_metrics_timeseries.experiment_saved_metrics_timeseries_job,
|
||||
experiment_timeseries_recalculation.experiment_timeseries_recalculation_job,
|
||||
]
|
||||
sensors = [
|
||||
experiment_regular_metrics_timeseries.experiment_regular_metrics_timeseries_discovery_sensor,
|
||||
experiment_saved_metrics_timeseries.experiment_saved_metrics_timeseries_discovery_sensor,
|
||||
experiment_timeseries_recalculation.experiment_timeseries_recalculation_sensor,
|
||||
]
|
||||
schedules = [
|
||||
experiment_regular_metrics_timeseries.experiment_regular_metrics_timeseries_refresh_schedule,
|
||||
@@ -33,6 +39,7 @@ def _create_definitions():
|
||||
assets=[
|
||||
experiment_regular_metrics_timeseries.experiment_regular_metrics_timeseries,
|
||||
experiment_saved_metrics_timeseries.experiment_saved_metrics_timeseries,
|
||||
experiment_timeseries_recalculation.experiment_timeseries_recalculation,
|
||||
],
|
||||
jobs=jobs,
|
||||
sensors=sensors,
|
||||
|
||||
108
dags/tests/test_experiment_timeseries_recalculation.py
Normal file
108
dags/tests/test_experiment_timeseries_recalculation.py
Normal file
@@ -0,0 +1,108 @@
|
||||
import datetime
|
||||
from typing import cast
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import pytest
|
||||
from posthog.test.base import BaseTest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import dagster
|
||||
|
||||
from posthog.schema import ExperimentQueryResponse, ExperimentStatsBaseValidated, ExperimentVariantResultFrequentist
|
||||
|
||||
from posthog.models import Organization, Team, User
|
||||
from posthog.models.experiment import Experiment, ExperimentMetricResult, ExperimentTimeseriesRecalculation
|
||||
from posthog.models.feature_flag import FeatureFlag
|
||||
|
||||
from dags.experiment_timeseries_recalculation import experiment_timeseries_recalculation
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestExperimentRecalculation(BaseTest):
|
||||
def test_experiment_timeseries_recalculation_asset(self):
|
||||
"""Test that the recalculation asset processes all days and creates correct records."""
|
||||
org = Organization.objects.create(name="Test Org")
|
||||
team = Team.objects.create(organization=org, name="Test Team", timezone="America/New_York")
|
||||
user = User.objects.create(email="test@example.com")
|
||||
|
||||
flag = FeatureFlag.objects.create(team=team, key="test-flag", created_by=user)
|
||||
experiment = Experiment.objects.create(
|
||||
name="Test Experiment",
|
||||
team=team,
|
||||
feature_flag=flag,
|
||||
start_date=datetime.datetime(2024, 12, 25, 10, 0, 0, tzinfo=ZoneInfo("UTC")),
|
||||
end_date=datetime.datetime(2024, 12, 27, 10, 0, 0, tzinfo=ZoneInfo("UTC")), # 3 days in NYC timezone
|
||||
)
|
||||
|
||||
metric_data = {
|
||||
"metric_type": "mean",
|
||||
"uuid": "test-metric-uuid",
|
||||
"source": {"kind": "EventsNode", "event": "test_event"},
|
||||
}
|
||||
recalculation_request = ExperimentTimeseriesRecalculation.objects.create(
|
||||
team=team,
|
||||
experiment=experiment,
|
||||
metric=metric_data,
|
||||
fingerprint="test-fingerprint",
|
||||
status=ExperimentTimeseriesRecalculation.Status.PENDING,
|
||||
)
|
||||
|
||||
# Partition key format must match what the sensor creates
|
||||
partition_key = (
|
||||
f"recalculation_{recalculation_request.id}_"
|
||||
f"experiment_{experiment.id}_"
|
||||
f"metric_test-metric-uuid_test-fingerprint"
|
||||
)
|
||||
context = dagster.build_asset_context(partition_key=partition_key)
|
||||
|
||||
mock_result = ExperimentQueryResponse(
|
||||
baseline=ExperimentStatsBaseValidated(key="control", number_of_samples=100, sum=1000, sum_squares=10000),
|
||||
variant_results=[
|
||||
ExperimentVariantResultFrequentist(
|
||||
key="test", number_of_samples=110, sum=1100, sum_squares=11000, significant=False
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
# Mock ClickHouse query results since we're only testing the recalculation processing logic
|
||||
with patch("dags.experiment_timeseries_recalculation.ExperimentQueryRunner") as mock_query_runner_class:
|
||||
mock_query_runner = MagicMock()
|
||||
mock_query_runner._calculate.return_value = mock_result
|
||||
mock_query_runner_class.return_value = mock_query_runner
|
||||
|
||||
with patch(
|
||||
"dags.experiment_timeseries_recalculation.remove_step_sessions_from_experiment_result"
|
||||
) as mock_remove_sessions:
|
||||
mock_remove_sessions.return_value = mock_result
|
||||
|
||||
result = cast(dict, experiment_timeseries_recalculation(context))
|
||||
|
||||
recalculation_request.refresh_from_db()
|
||||
assert recalculation_request.status == ExperimentTimeseriesRecalculation.Status.COMPLETED
|
||||
assert recalculation_request.last_successful_date == datetime.date(2024, 12, 27)
|
||||
|
||||
metric_results = ExperimentMetricResult.objects.filter(
|
||||
experiment=experiment, metric_uuid="test-metric-uuid", fingerprint="test-fingerprint"
|
||||
).order_by("query_to")
|
||||
|
||||
assert len(metric_results) == 3
|
||||
|
||||
# NYC is UTC-5 in December, so end-of-day boundaries are shifted
|
||||
expected_dates_utc = [
|
||||
datetime.datetime(2024, 12, 26, 5, 0, 0, tzinfo=ZoneInfo("UTC")), # End of Dec 25 in EST
|
||||
datetime.datetime(2024, 12, 27, 5, 0, 0, tzinfo=ZoneInfo("UTC")), # End of Dec 26 in EST
|
||||
datetime.datetime(2024, 12, 28, 5, 0, 0, tzinfo=ZoneInfo("UTC")), # End of Dec 27 in EST
|
||||
]
|
||||
|
||||
for i, metric_result in enumerate(metric_results):
|
||||
assert metric_result.query_to == expected_dates_utc[i]
|
||||
assert metric_result.query_from == experiment.start_date
|
||||
assert metric_result.status == ExperimentMetricResult.Status.COMPLETED
|
||||
assert metric_result.result == mock_result.model_dump()
|
||||
|
||||
assert result["recalculation_id"] == str(recalculation_request.id)
|
||||
assert result["experiment_id"] == experiment.id
|
||||
assert result["metric_uuid"] == "test-metric-uuid"
|
||||
assert result["days_processed"] == 3
|
||||
assert result["start_date"] == "2024-12-25"
|
||||
assert result["end_date"] == "2024-12-27"
|
||||
@@ -25,7 +25,13 @@ from posthog.hogql_queries.experiments.experiment_metric_fingerprint import comp
|
||||
from posthog.models import Survey
|
||||
from posthog.models.activity_logging.activity_log import Detail, changes_between, log_activity
|
||||
from posthog.models.cohort import Cohort
|
||||
from posthog.models.experiment import Experiment, ExperimentHoldout, ExperimentMetricResult, ExperimentSavedMetric
|
||||
from posthog.models.experiment import (
|
||||
Experiment,
|
||||
ExperimentHoldout,
|
||||
ExperimentMetricResult,
|
||||
ExperimentSavedMetric,
|
||||
ExperimentTimeseriesRecalculation,
|
||||
)
|
||||
from posthog.models.feature_flag.feature_flag import FeatureFlag, FeatureFlagEvaluationTag
|
||||
from posthog.models.filters.filter import Filter
|
||||
from posthog.models.signals import model_activity_signal
|
||||
@@ -961,6 +967,72 @@ class EnterpriseExperimentsViewSet(
|
||||
|
||||
return Response(response_data)
|
||||
|
||||
@action(methods=["POST"], detail=True, required_scopes=["experiment:write"])
|
||||
def recalculate_timeseries(self, request: Request, *args: Any, **kwargs: Any) -> Response:
|
||||
"""
|
||||
Create a recalculation request for experiment timeseries data.
|
||||
|
||||
Request body:
|
||||
- metric (required): The full metric object to recalculate
|
||||
- fingerprint (required): The fingerprint of the metric configuration
|
||||
"""
|
||||
experiment = self.get_object()
|
||||
|
||||
metric = request.data.get("metric")
|
||||
fingerprint = request.data.get("fingerprint")
|
||||
|
||||
if not metric:
|
||||
raise ValidationError("metric is required")
|
||||
if not fingerprint:
|
||||
raise ValidationError("fingerprint is required")
|
||||
|
||||
if not experiment.start_date:
|
||||
raise ValidationError("Cannot recalculate timeseries for experiment that hasn't started")
|
||||
|
||||
# Check for existing recalculation request to ensure idempotency
|
||||
existing_recalculation = ExperimentTimeseriesRecalculation.objects.filter(
|
||||
experiment=experiment,
|
||||
fingerprint=fingerprint,
|
||||
status__in=[
|
||||
ExperimentTimeseriesRecalculation.Status.PENDING,
|
||||
ExperimentTimeseriesRecalculation.Status.IN_PROGRESS,
|
||||
],
|
||||
).first()
|
||||
|
||||
if existing_recalculation:
|
||||
return Response(
|
||||
{
|
||||
"id": existing_recalculation.id,
|
||||
"experiment_id": experiment.id,
|
||||
"metric_uuid": existing_recalculation.metric.get("uuid"),
|
||||
"fingerprint": fingerprint,
|
||||
"status": existing_recalculation.status,
|
||||
"created_at": existing_recalculation.created_at.isoformat(),
|
||||
},
|
||||
status=200,
|
||||
)
|
||||
|
||||
# Create new recalculation request
|
||||
recalculation_request = ExperimentTimeseriesRecalculation.objects.create(
|
||||
team=experiment.team,
|
||||
experiment=experiment,
|
||||
metric=metric,
|
||||
fingerprint=fingerprint,
|
||||
status=ExperimentTimeseriesRecalculation.Status.PENDING,
|
||||
)
|
||||
|
||||
return Response(
|
||||
{
|
||||
"id": recalculation_request.id,
|
||||
"experiment_id": experiment.id,
|
||||
"metric_uuid": metric.get("uuid"),
|
||||
"fingerprint": fingerprint,
|
||||
"status": recalculation_request.status,
|
||||
"created_at": recalculation_request.created_at.isoformat(),
|
||||
},
|
||||
status=201,
|
||||
)
|
||||
|
||||
|
||||
@receiver(model_activity_signal, sender=Experiment)
|
||||
def handle_experiment_change(
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { useValues } from 'kea'
|
||||
import { useActions, useValues } from 'kea'
|
||||
|
||||
import { LemonButton, LemonDivider, LemonModal } from '@posthog/lemon-ui'
|
||||
import { LemonButton, LemonDialog, LemonDivider, LemonModal } from '@posthog/lemon-ui'
|
||||
|
||||
import { More } from 'lib/lemon-ui/LemonButton/More'
|
||||
import { Spinner } from 'lib/lemon-ui/Spinner'
|
||||
|
||||
import { ExperimentMetric } from '~/queries/schema/schema-general'
|
||||
@@ -30,9 +31,32 @@ export function TimeseriesModal({
|
||||
}: TimeseriesModalProps): JSX.Element {
|
||||
const logic = experimentTimeseriesLogic({ experimentId: experiment.id, metric: isOpen ? metric : undefined })
|
||||
const { chartData, progressMessage, hasTimeseriesData, timeseriesLoading } = useValues(logic)
|
||||
const { recalculateTimeseries } = useActions(logic)
|
||||
|
||||
const processedChartData = chartData(variantResult.key)
|
||||
|
||||
const handleRecalculate = (): void => {
|
||||
LemonDialog.open({
|
||||
title: 'Recalculate timeseries data',
|
||||
content: (
|
||||
<div>
|
||||
<p>
|
||||
All existing timeseries data will be deleted and recalculated from scratch. This could take a
|
||||
long time for large datasets.
|
||||
</p>
|
||||
</div>
|
||||
),
|
||||
primaryButton: {
|
||||
children: 'Recalculate',
|
||||
type: 'primary',
|
||||
onClick: () => recalculateTimeseries({ metric }),
|
||||
},
|
||||
secondaryButton: {
|
||||
children: 'Cancel',
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return (
|
||||
<LemonModal
|
||||
isOpen={isOpen}
|
||||
@@ -68,21 +92,32 @@ export function TimeseriesModal({
|
||||
<Spinner className="text-lg" />
|
||||
<span>Loading timeseries…</span>
|
||||
</div>
|
||||
) : hasTimeseriesData ? (
|
||||
) : (
|
||||
<div>
|
||||
{progressMessage && <div className="text-xs text-muted mt-2 mb-4">{progressMessage}</div>}
|
||||
{processedChartData ? (
|
||||
<VariantTimeseriesChart chartData={processedChartData} />
|
||||
<div className="flex justify-between items-center mt-2 mb-4">
|
||||
<div className="text-xs text-muted">{progressMessage || ''}</div>
|
||||
<More
|
||||
overlay={
|
||||
<>
|
||||
<LemonButton onClick={handleRecalculate}>Recalculate</LemonButton>
|
||||
</>
|
||||
}
|
||||
/>
|
||||
</div>
|
||||
{hasTimeseriesData ? (
|
||||
processedChartData ? (
|
||||
<VariantTimeseriesChart chartData={processedChartData} />
|
||||
) : (
|
||||
<div className="p-10 text-center text-muted">
|
||||
No timeseries data available for this variant
|
||||
</div>
|
||||
)
|
||||
) : (
|
||||
<div className="p-10 text-center text-muted">
|
||||
No timeseries data available for this variant
|
||||
<div className="p-10 text-center text-muted -translate-y-6">
|
||||
No timeseries data available
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
) : (
|
||||
<div style={{ padding: '40px', textAlign: 'center', color: '#666' }}>
|
||||
No timeseries data available
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</LemonModal>
|
||||
|
||||
@@ -4,6 +4,7 @@ import { loaders } from 'kea-loaders'
|
||||
import { ChartDataset as ChartJsDataset } from 'lib/Chart'
|
||||
import api from 'lib/api'
|
||||
import { getSeriesColor } from 'lib/colors'
|
||||
import { lemonToast } from 'lib/lemon-ui/LemonToast'
|
||||
import { hexToRGBA } from 'lib/utils'
|
||||
|
||||
import {
|
||||
@@ -62,6 +63,7 @@ export const experimentTimeseriesLogic = kea<experimentTimeseriesLogicType>([
|
||||
|
||||
actions(() => ({
|
||||
clearTimeseries: true,
|
||||
recalculateTimeseries: ({ metric }: { metric: ExperimentMetric }) => ({ metric }),
|
||||
})),
|
||||
|
||||
loaders(({ props }) => ({
|
||||
@@ -82,6 +84,34 @@ export const experimentTimeseriesLogic = kea<experimentTimeseriesLogicType>([
|
||||
return response
|
||||
},
|
||||
clearTimeseries: () => null,
|
||||
recalculateTimeseries: async ({ metric }: { metric: ExperimentMetric }) => {
|
||||
if (!metric.fingerprint) {
|
||||
throw new Error('Metric fingerprint is required')
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await api.createResponse(
|
||||
`api/projects/@current/experiments/${props.experimentId}/recalculate_timeseries/`,
|
||||
{
|
||||
metric: metric,
|
||||
fingerprint: metric.fingerprint,
|
||||
}
|
||||
)
|
||||
|
||||
if (response.ok) {
|
||||
if (response.status === 201) {
|
||||
lemonToast.success('Recalculation started successfully')
|
||||
} else if (response.status === 200) {
|
||||
lemonToast.info('Recalculation already in progress')
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
lemonToast.error('Failed to start recalculation')
|
||||
throw error
|
||||
}
|
||||
|
||||
return null
|
||||
},
|
||||
},
|
||||
],
|
||||
})),
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
# Generated by Django 4.2.22 on 2025-10-16 09:30
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
import posthog.models.utils
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
("posthog", "0883_featureflag_last_called_at"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="ExperimentTimeseriesRecalculation",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.UUIDField(
|
||||
default=posthog.models.utils.uuid7, editable=False, primary_key=True, serialize=False
|
||||
),
|
||||
),
|
||||
("metric", models.JSONField()),
|
||||
("fingerprint", models.CharField(max_length=64)),
|
||||
(
|
||||
"status",
|
||||
models.CharField(
|
||||
choices=[
|
||||
("pending", "Pending"),
|
||||
("in_progress", "In Progress"),
|
||||
("completed", "Completed"),
|
||||
("failed", "Failed"),
|
||||
],
|
||||
default="pending",
|
||||
max_length=20,
|
||||
),
|
||||
),
|
||||
("last_successful_date", models.DateField(blank=True, null=True)),
|
||||
("created_at", models.DateTimeField(auto_now_add=True)),
|
||||
("experiment", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.experiment")),
|
||||
("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team")),
|
||||
],
|
||||
options={
|
||||
"indexes": [models.Index(fields=["status"], name="posthog_exp_status_01657f_idx")],
|
||||
},
|
||||
),
|
||||
migrations.AddConstraint(
|
||||
model_name="experimenttimeseriesrecalculation",
|
||||
constraint=models.UniqueConstraint(
|
||||
condition=models.Q(("status__in", ["pending", "in_progress"])),
|
||||
fields=("experiment", "fingerprint"),
|
||||
name="unique_active_recalculation_per_experiment_metric",
|
||||
),
|
||||
),
|
||||
]
|
||||
@@ -1 +1 @@
|
||||
0883_featureflag_last_called_at
|
||||
0884_experimenttimeseriesrecalculation_and_more
|
||||
|
||||
@@ -7,7 +7,7 @@ from django.utils import timezone
|
||||
from posthog.models.activity_logging.model_activity import ModelActivityMixin
|
||||
from posthog.models.file_system.file_system_mixin import FileSystemSyncMixin
|
||||
from posthog.models.file_system.file_system_representation import FileSystemRepresentation
|
||||
from posthog.models.utils import RootTeamMixin
|
||||
from posthog.models.utils import RootTeamMixin, UUIDModel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from posthog.models.team import Team
|
||||
@@ -193,3 +193,37 @@ class ExperimentMetricResult(models.Model):
|
||||
|
||||
def __str__(self):
|
||||
return f"ExperimentMetricResult({self.experiment_id}, {self.metric_uuid}, {self.query_from}, {self.status})"
|
||||
|
||||
|
||||
class ExperimentTimeseriesRecalculation(UUIDModel):
|
||||
class Status(models.TextChoices):
|
||||
PENDING = "pending", "Pending"
|
||||
IN_PROGRESS = "in_progress", "In Progress"
|
||||
COMPLETED = "completed", "Completed"
|
||||
FAILED = "failed", "Failed"
|
||||
|
||||
team = models.ForeignKey("Team", on_delete=models.CASCADE)
|
||||
experiment = models.ForeignKey("Experiment", on_delete=models.CASCADE)
|
||||
metric = models.JSONField()
|
||||
fingerprint = models.CharField(max_length=64) # SHA256 hash
|
||||
|
||||
status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING)
|
||||
last_successful_date = models.DateField(null=True, blank=True)
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
indexes = [
|
||||
models.Index(fields=["status"]),
|
||||
]
|
||||
constraints = [
|
||||
models.UniqueConstraint(
|
||||
fields=["experiment", "fingerprint"],
|
||||
condition=models.Q(status__in=["pending", "in_progress"]),
|
||||
name="unique_active_recalculation_per_experiment_metric",
|
||||
),
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
metric_uuid = self.metric.get("uuid", "unknown")
|
||||
return f"ExperimentTimeseriesRecalculation(exp={self.experiment_id}, metric={metric_uuid}, fingerprint={self.fingerprint}, status={self.status})"
|
||||
|
||||
Reference in New Issue
Block a user