mirror of
https://github.com/BillyOutlast/posthog.git
synced 2026-02-04 03:01:23 +01:00
fix(experiments): Remove stale partitions in sensor (#39479)
This commit is contained in:
@@ -25,7 +25,7 @@ from posthog.models.experiment import Experiment, ExperimentMetricResult
|
||||
from dags.common import JobOwners
|
||||
from dags.experiments import (
|
||||
_parse_partition_key,
|
||||
discover_experiment_metric_partitions,
|
||||
refresh_experiment_metric_partitions,
|
||||
remove_step_sessions_from_experiment_result,
|
||||
schedule_experiment_metric_partitions,
|
||||
)
|
||||
@@ -257,7 +257,7 @@ def experiment_regular_metrics_timeseries_discovery_sensor(context: dagster.Sens
|
||||
analysis. When new combinations are found, it creates dynamic partitions for the
|
||||
experiment_regular_metrics_timeseries asset and triggers processing only for the new partitions.
|
||||
"""
|
||||
return discover_experiment_metric_partitions(
|
||||
return refresh_experiment_metric_partitions(
|
||||
context=context,
|
||||
partition_name=EXPERIMENT_REGULAR_METRICS_PARTITIONS_NAME,
|
||||
partitions_def=experiment_regular_metrics_partitions_def,
|
||||
|
||||
@@ -23,7 +23,7 @@ from posthog.models.experiment import Experiment, ExperimentMetricResult
|
||||
from dags.common import JobOwners
|
||||
from dags.experiments import (
|
||||
_parse_partition_key,
|
||||
discover_experiment_metric_partitions,
|
||||
refresh_experiment_metric_partitions,
|
||||
remove_step_sessions_from_experiment_result,
|
||||
schedule_experiment_metric_partitions,
|
||||
)
|
||||
@@ -259,7 +259,7 @@ def experiment_saved_metrics_timeseries_discovery_sensor(context: dagster.Sensor
|
||||
analysis. When new combinations are found, it creates dynamic partitions for the
|
||||
experiment_saved_metrics_timeseries asset and triggers processing only for the new partitions.
|
||||
"""
|
||||
return discover_experiment_metric_partitions(
|
||||
return refresh_experiment_metric_partitions(
|
||||
context=context,
|
||||
partition_name=EXPERIMENT_SAVED_METRICS_PARTITIONS_NAME,
|
||||
partitions_def=experiment_saved_metrics_partitions_def,
|
||||
|
||||
@@ -132,18 +132,18 @@ def schedule_experiment_metric_partitions(
|
||||
raise dagster.Failure(f"Failed to schedule refresh for {partition_name}: {e}")
|
||||
|
||||
|
||||
def discover_experiment_metric_partitions(
|
||||
def refresh_experiment_metric_partitions(
|
||||
context: dagster.SensorEvaluationContext,
|
||||
partition_name: str,
|
||||
partitions_def: dagster.DynamicPartitionsDefinition,
|
||||
get_metrics_fn,
|
||||
) -> dagster.SensorResult | dagster.SkipReason:
|
||||
"""
|
||||
Automatically discover new experiment-metric combinations and trigger timeseries calculation.
|
||||
Synchronize experiment-metric partitions with current database state.
|
||||
|
||||
This function continuously monitors for new experiments or metrics that need timeseries
|
||||
analysis. When new combinations are found, it creates dynamic partitions and triggers
|
||||
processing only for the new partitions.
|
||||
This function compares expected partitions (based on active experiments/metrics in the database)
|
||||
with existing Dagster partitions. It creates new partitions for newly discovered combinations
|
||||
and removes obsolete partitions for deleted or inactive experiments.
|
||||
|
||||
Args:
|
||||
context: Dagster sensor evaluation context
|
||||
@@ -162,15 +162,22 @@ def discover_experiment_metric_partitions(
|
||||
context.log.debug(f"No {partition_name} found for timeseries analysis")
|
||||
return dagster.SkipReason(f"No experiments with {partition_name} found")
|
||||
|
||||
# Generate partition keys in format: experiment_{id}_metric_{uuid}_{fingerprint}
|
||||
current_partition_keys = [
|
||||
# Generate expected partition keys based on database state
|
||||
# Format: experiment_{id}_metric_{uuid}_{fingerprint}
|
||||
expected_partition_keys = [
|
||||
f"experiment_{exp_id}_metric_{metric_uuid}_{fingerprint}"
|
||||
for exp_id, metric_uuid, fingerprint in current_experiment_metrics
|
||||
]
|
||||
|
||||
# Check which partitions are new
|
||||
# Get existing partitions from Dagster
|
||||
existing_partitions = set(context.instance.get_dynamic_partitions(partition_name))
|
||||
new_partitions = [key for key in current_partition_keys if key not in existing_partitions]
|
||||
|
||||
# Find new partitions (expected but not existing)
|
||||
new_partitions = [key for key in expected_partition_keys if key not in existing_partitions]
|
||||
|
||||
# Find obsolete partitions (existing but not expected)
|
||||
expected_partition_keys_set = set(expected_partition_keys)
|
||||
obsolete_partitions = [key for key in existing_partitions if key not in expected_partition_keys_set]
|
||||
|
||||
# Build response
|
||||
run_requests = []
|
||||
@@ -190,9 +197,14 @@ def discover_experiment_metric_partitions(
|
||||
)
|
||||
for partition_key in new_partitions
|
||||
]
|
||||
else:
|
||||
context.log.debug(f"No new {partition_name} discovered for timeseries analysis")
|
||||
return dagster.SkipReason(f"No new {partition_name} to process")
|
||||
|
||||
if obsolete_partitions:
|
||||
context.log.info(f"Removing {len(obsolete_partitions)} obsolete {partition_name} partitions")
|
||||
dynamic_partitions_requests.append(partitions_def.build_delete_request(obsolete_partitions))
|
||||
|
||||
if not new_partitions and not obsolete_partitions:
|
||||
context.log.debug(f"No partition changes needed for {partition_name}")
|
||||
return dagster.SkipReason(f"No partition changes needed for {partition_name}")
|
||||
|
||||
return dagster.SensorResult(
|
||||
run_requests=run_requests,
|
||||
|
||||
@@ -14,6 +14,7 @@ dags/deletes.py:0: error: "DeleteConfig" has no attribute "log" [attr-defined]
|
||||
dags/deletes.py:0: error: Incompatible return value type (got "tuple[PendingDeletesDictionary, dict[int | None, MutationWaiter]]", expected "tuple[PendingDeletesDictionary, dict[int, MutationWaiter]]") [return-value]
|
||||
dags/exchange_rate.py:0: error: Argument "context" to "store_exchange_rates_in_clickhouse" has incompatible type "AssetExecutionContext"; expected "OpExecutionContext" [arg-type]
|
||||
dags/exchange_rate.py:0: error: Argument "context" to "store_exchange_rates_in_clickhouse" has incompatible type "AssetExecutionContext"; expected "OpExecutionContext" [arg-type]
|
||||
dags/experiments.py:0: error: Argument 1 to "append" of "list" has incompatible type "DeleteDynamicPartitionsRequest"; expected "AddDynamicPartitionsRequest" [arg-type]
|
||||
dags/export_query_logs_to_s3.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "DateRange") [assignment]
|
||||
dags/export_query_logs_to_s3.py:0: error: Statement is unreachable [unreachable]
|
||||
dags/locations/shared.py:0: error: Item "Iterable[JobDefinition | UnresolvedAssetJobDefinition]" of "Iterable[JobDefinition | UnresolvedAssetJobDefinition] | None" has no attribute "append" [union-attr]
|
||||
|
||||
Reference in New Issue
Block a user