feat(sessions): Run sessions backfill from distributed events / replay tables (#41211)

This commit is contained in:
Robbie
2025-11-10 19:17:11 +00:00
committed by GitHub
parent 1b6792e0b0
commit a78de5074d
2 changed files with 6 additions and 18 deletions

View File

@@ -1,7 +1,7 @@
from clickhouse_driver import Client
from dagster import AssetExecutionContext, BackfillPolicy, DailyPartitionsDefinition, asset
from posthog.clickhouse.cluster import get_cluster
from posthog.clickhouse.client import sync_execute
from posthog.clickhouse.client.connection import Workload
from posthog.git import get_git_commit_short
from posthog.models.raw_sessions.sessions_v3 import (
RAW_SESSION_TABLE_BACKFILL_RECORDINGS_SQL_V3,
@@ -46,12 +46,7 @@ def sessions_v3_backfill(context: AssetExecutionContext) -> None:
)
context.log.info(backfill_sql)
cluster = get_cluster()
def backfill_per_shard(client: Client):
client.execute(backfill_sql)
cluster.map_one_host_per_shard(backfill_per_shard)
sync_execute(backfill_sql, workload=Workload.OFFLINE)
context.log.info(f"Successfully backfilled sessions_v3 for {partition_range_str}")
@@ -75,11 +70,6 @@ def sessions_v3_backfill_replay(context: AssetExecutionContext) -> None:
)
context.log.info(backfill_sql)
cluster = get_cluster()
def backfill_per_shard(client: Client):
client.execute(backfill_sql)
cluster.map_one_host_per_shard(backfill_per_shard)
sync_execute(backfill_sql, workload=Workload.OFFLINE)
context.log.info(f"Successfully backfilled sessions_v3 for {partition_range_str}")

View File

@@ -501,8 +501,7 @@ INSERT INTO {database}.{writable_table}
writable_table=WRITABLE_RAW_SESSIONS_TABLE_V3(),
select_sql=RAW_SESSION_TABLE_MV_SELECT_SQL_V3(
where=where,
# use sharded_events for the source table, this means that the backfill MUST run on every shard
source_table=f"{settings.CLICKHOUSE_DATABASE}.sharded_events",
source_table=f"{settings.CLICKHOUSE_DATABASE}.events",
),
)
@@ -516,8 +515,7 @@ INSERT INTO {database}.{writable_table}
writable_table=WRITABLE_RAW_SESSIONS_TABLE_V3(),
select_sql=RAW_SESSION_TABLE_MV_RECORDINGS_SELECT_SQL_V3(
where=where,
# use sharded_events for the source table, this means that the backfill MUST run on every shard
source_table=f"{settings.CLICKHOUSE_DATABASE}.sharded_session_replay_events",
source_table=f"{settings.CLICKHOUSE_DATABASE}.session_replay_events",
),
)