From a78de5074d7fd52d75eeb55ea5e2d2d820a90bd1 Mon Sep 17 00:00:00 2001 From: Robbie Date: Mon, 10 Nov 2025 19:17:11 +0000 Subject: [PATCH] feat(sessions): Run sessions backfill from distributed events / replay tables (#41211) --- dags/sessions.py | 18 ++++-------------- posthog/models/raw_sessions/sessions_v3.py | 6 ++---- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/dags/sessions.py b/dags/sessions.py index 2bd432c645..7db77f5d86 100644 --- a/dags/sessions.py +++ b/dags/sessions.py @@ -1,7 +1,7 @@ -from clickhouse_driver import Client from dagster import AssetExecutionContext, BackfillPolicy, DailyPartitionsDefinition, asset -from posthog.clickhouse.cluster import get_cluster +from posthog.clickhouse.client import sync_execute +from posthog.clickhouse.client.connection import Workload from posthog.git import get_git_commit_short from posthog.models.raw_sessions.sessions_v3 import ( RAW_SESSION_TABLE_BACKFILL_RECORDINGS_SQL_V3, @@ -46,12 +46,7 @@ def sessions_v3_backfill(context: AssetExecutionContext) -> None: ) context.log.info(backfill_sql) - cluster = get_cluster() - - def backfill_per_shard(client: Client): - client.execute(backfill_sql) - - cluster.map_one_host_per_shard(backfill_per_shard) + sync_execute(backfill_sql, workload=Workload.OFFLINE) context.log.info(f"Successfully backfilled sessions_v3 for {partition_range_str}") @@ -75,11 +70,6 @@ def sessions_v3_backfill_replay(context: AssetExecutionContext) -> None: ) context.log.info(backfill_sql) - cluster = get_cluster() - - def backfill_per_shard(client: Client): - client.execute(backfill_sql) - - cluster.map_one_host_per_shard(backfill_per_shard) + sync_execute(backfill_sql, workload=Workload.OFFLINE) context.log.info(f"Successfully backfilled sessions_v3 for {partition_range_str}") diff --git a/posthog/models/raw_sessions/sessions_v3.py b/posthog/models/raw_sessions/sessions_v3.py index aca8894aef..8458a04d3e 100644 --- a/posthog/models/raw_sessions/sessions_v3.py +++ b/posthog/models/raw_sessions/sessions_v3.py @@ -501,8 +501,7 @@ INSERT INTO {database}.{writable_table} writable_table=WRITABLE_RAW_SESSIONS_TABLE_V3(), select_sql=RAW_SESSION_TABLE_MV_SELECT_SQL_V3( where=where, - # use sharded_events for the source table, this means that the backfill MUST run on every shard - source_table=f"{settings.CLICKHOUSE_DATABASE}.sharded_events", + source_table=f"{settings.CLICKHOUSE_DATABASE}.events", ), ) @@ -516,8 +515,7 @@ INSERT INTO {database}.{writable_table} writable_table=WRITABLE_RAW_SESSIONS_TABLE_V3(), select_sql=RAW_SESSION_TABLE_MV_RECORDINGS_SELECT_SQL_V3( where=where, - # use sharded_events for the source table, this means that the backfill MUST run on every shard - source_table=f"{settings.CLICKHOUSE_DATABASE}.sharded_session_replay_events", + source_table=f"{settings.CLICKHOUSE_DATABASE}.session_replay_events", ), )