feat(web-analytics): sytem sync partition replicas before swap (#38398)

This commit is contained in:
Lucas Ricoy
2025-09-19 23:27:02 +02:00
committed by GitHub
parent ae335108a5
commit 1cfdc670f8
2 changed files with 17 additions and 3 deletions

View File

@@ -27,6 +27,7 @@ from dags.web_preaggregated_utils import (
merge_clickhouse_settings,
recreate_staging_table,
swap_partitions_from_staging,
sync_partitions_on_replicas,
web_analytics_retry_policy_def,
)
@@ -64,14 +65,14 @@ def pre_aggregate_web_analytics_data(
staging_table_name = f"{table_name}_staging"
try:
# 1. Recreate staging table (replaces partition cleaning)
# 1. Recreate staging table
if staging_table_name not in REPLACE_TEMPLATES_BY_STAGING_TABLE_NAME:
raise dagster.Failure(f"No REPLACE TABLE function found for {staging_table_name}")
replace_sql_func = REPLACE_TEMPLATES_BY_STAGING_TABLE_NAME[staging_table_name]
recreate_staging_table(context, cluster, staging_table_name, replace_sql_func)
# 2. Generate hourly data into staging table
# 2. Write data into staging table
insert_query = sql_generator(
date_start=date_start,
date_end=date_end,
@@ -85,7 +86,10 @@ def pre_aggregate_web_analytics_data(
context.log.info(insert_query)
sync_execute(insert_query)
# 3. Atomically swap partitions from staging to target
# 3. Sync replicas before partition swapping to ensure consistency
sync_partitions_on_replicas(context, cluster, staging_table_name)
# 4. Atomically swap partitions from staging to target
context.log.info(f"Swapping partitions from {staging_table_name} to {table_name}")
swap_partitions_from_staging(context, cluster, table_name, staging_table_name)

View File

@@ -99,6 +99,16 @@ def drop_partitions_for_date_range(
current_date += timedelta(days=1)
def sync_partitions_on_replicas(
context: dagster.AssetExecutionContext, cluster: ClickhouseCluster, target_table: str
) -> None:
context.log.info(f"Syncing replicas for {target_table} on all hosts")
cluster.map_hosts_by_roles(
lambda client: client.execute(f"SYSTEM SYNC REPLICA {target_table} LIGHTWEIGHT"),
node_roles=[NodeRole.DATA, NodeRole.COORDINATOR],
).result()
def swap_partitions_from_staging(
context: dagster.AssetExecutionContext, cluster: ClickhouseCluster, target_table: str, staging_table: str
) -> None: