refactor(batch-exports): Rollout new stage activity to more Snowflake batch exports (#37434)

This commit is contained in:
Ross
2025-09-01 14:56:01 +01:00
committed by GitHub
parent 19e6f328f8
commit b3479201c2
2 changed files with 7 additions and 2 deletions

View File

@@ -15,6 +15,9 @@ BATCH_EXPORT_SNOWFLAKE_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(
BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS: list[str] = get_list(
os.getenv("BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS", "")
)
BATCH_EXPORT_SNOWFLAKE_USE_INTERNAL_STAGE_ROLLOUT_PERCENTAGE: int = get_from_env(
"BATCH_EXPORT_SNOWFLAKE_USE_INTERNAL_STAGE_ROLLOUT_PERCENTAGE", 0, type_cast=int
)
BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES: int = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_POSTGRES_RECORD_BATCH_QUEUE_MAX_SIZE_BYTES: int = get_from_env(

View File

@@ -1375,8 +1375,10 @@ class SnowflakeBatchExportWorkflow(PostHogWorkflow):
destination_default_fields=snowflake_default_fields(),
)
# Use stage consumer for specific team IDs, otherwise use the original activity
if str(inputs.team_id) in settings.BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS:
if (
str(inputs.team_id) in settings.BATCH_EXPORT_SNOWFLAKE_USE_STAGE_TEAM_IDS
or inputs.team_id % 100 < settings.BATCH_EXPORT_SNOWFLAKE_USE_INTERNAL_STAGE_ROLLOUT_PERCENTAGE
):
await execute_batch_export_using_internal_stage(
insert_into_snowflake_activity_from_stage,
insert_inputs,